Performance Priorities which involve a database change

  1. Splitting the ProcessInstance table into ProcessInstance and BatchProcessInstance
    1. This drastically reduces the size of the ProcessInstance table and prevents row migration
    2. Do we remove the fullpath from the logFile column, and assume it will always be under the WorkingDirectory?
  2. Partition the ProcessInstance table
    1. Partitioning requires inserting all rows into a new table.
    2. This may need to be modified to partition to Stream reference.
    3. Partitioning by month seems to be easiest way to eek out extra performance of current transactions
    4. Partitioning by month also helps to theoretically boost performance of the web interface, specifically the task.jsp page if we allow users to limit the stats of processInstances to be within the last week/month(s), which currently isn't enabled.
  3. Partition the Stream
    1. Should probably be done before ProcessInstance partitioning
    2. Not sure what the best strategy is.
    3. One strategy is to partition based on Task, or a reference to a root-level task possibly. This gets a little messy.
    4. Second strategy is to partition based on RootStream. The problem with this is that it requires processing of the ancestor stream at table insertion.
      1. But maybe we could add the column to the current database, and start computing those values now. Then later, we can insert those into a new table with interval partitions
    5. Time-only partitioning requires all queries to specify a time range in order to limit search partitions
  4. Add a RootStream column to the Stream table, enable RootStream locking.
    1. Enables fast Stream Tree locking
    2. When a process instance acquires a lock, it will no longer lock on it's parent node, which is a stream. It will, from now on, lock on Stream with the primary key of the RootStream of the PI's parent stream node, which will prevent any dead locks as each change to a process instance's status will preclude the modification of any other part of the tree.
    3. Enables the canceling of top-level streams. Without this, it's impossible to know if another database connection has a lock on a child node of a top level stream.
    4. To maintain full backward compatibility, the stream table will need to be modified to populate this column, which may take a while for all streams.
    5. Should speed up rolling back
  5. Dead branch isLatest decrementing
    1. Currently, Stream Tree >= Stream Tree where isLatest = 1 >= Stream Latest Tree
    2. Proposal: When a branch is declared dead, all child nodes would be decremented by 1
    3. This changes to ( Stream Tree where isLatest = 1 == Stream Latest Tree ), which eliminates recursive queries needed to find the latest tree
    4. This requires more work when rolling back a stream, for instance, but may also be offset with the speed gained from a reduced query time.
    5. May benefit greatly status changes to a stream execution tree

Backwards Compatibility Issues

  1. Active streams during transition
  2. Recently active streams during transition (streams which may be rolled back within a day or so)
  3. Streams that haven't been active for a week.

With weekly/monthly partitioning, we can lock an older partition for a Stream, for example, and prevent modification by another process while we are backfilling computed values, say for isLatest decrementing.

Software Migration

  • Continue to move as much code from stored procedures back into the pipeline so we can support Postgres and SQLite. This was started with the last few releases, we're continuing on
    • It also enables us to actually understand what operations are slow, and potentially modify them. Right now, Stored Procedures are a black box.
  • In conjunction with the Schema changes and partitioning, this should still provide with an overall performance boost
  • Move all database statements to Java 7 for readability (A branch with most of these changes already exists)
  • Add lots of tests!

Testing

Database Migration Testing

  • Need to be able to revert changes in the current database that I will test for all Performance Priorities. This should probably be done with backup files of the current data on zglast-oracle03.
  • This is more exploratory in nature. Once I'm able to decide on schema changes which are performant, I will write scripts to perform schema changes. Right now, I'm using the software Alembic and SQLAlchemy to aid in these migrations. One major benefit of Alembic is the database tables are defined in Python, and DBMS-specific SQL is automatically generated for every database system we plan to support.

Server Software Testing

First off, I Would like to expand to support three databases going forward:

  • Oracle 11g, possibly 12c in the future
  • PostgresSQL 9.2+ (and by proxy, commercial product EnterpriseDB)
  • Sqlite 3.8.3+ (3.8.3 includes recursive with statement support)

Now, a bit about Unit Testing and Integration Testing.

Unit testing should only test the code. Certain unit tests can/should be done with a database. Currently we use Oracle, but it could be convenient if, in addition/instead of oracle, we used SQLite as it is file based and flexible. Right now, the majority of unit tests only test the creation and uploading of a new Task. Many unit tests which aren't related to Task uploading are really some sort of integration test. Overall testing coverage is pretty low, and needs to be worked on.

Integration testing should test how certain parts of the code will interact with a database, and effectively simulate how the server would operate. Integration testing is loosely related to Database migration testing as well.

Integration testing is a lot harder than unit testing. Ideally, we would have some virtual machines and have integration with jenkins which can bring up/tear down these multiple virtual machines with a well-defined environment. Another option is for Amazon, or some sort of VPS service, as they are well-targeted to create various different environments easily. Integration testing should not be performed every time new code is checked in, but possibly ran on a daily basis. There's a lot of urgency in this migration, but it's a very high priority for me to make sure that any changes I make to the server software for the migration DO NOT preclude potential support for Postgres/SQLite, and the best way I think I can ensure this is with an adequate integration testing environment, but I don't believe I will be able to set up all these testing infrastructures in a reasonable amount of time.

Caveats of these database systems:

SQLite has no row-locking or partitioning.

PostgresSQL doesn't have friendly/automatic partitioning.

Oracle works best with CONNECT BY vs recursive with statements.

SQLite is a bit more complicated to setup, as it requires a native library. The version we need to use is pretty new, and the java support currently requires building by hand ( https://bitbucket.org/xerial/sqlite-jdbc ), but I was able to perform this successfully.

Caveats of current systems:

Current Oracle installs do not use timestamp with time zone. Not a problem if the server is in the same time zone (PST) as the database, which is the case. There are no plans to change this.

 

Additional Definitions

Task: A node which includes other nodes (sub-Tasks) or leaf-nodes, which are of type Process

Process: A leaf node of a Task which defines an process which can be ran as either a Batch job, or internally to the pipeline server as a Jython scriptlet.

Stream: A stream is an execution instance node of a task.

ProcessInstance: An execution instance of a Process inside a Stream.

Stream Tree: The tree, with the root being a top level stream, of all streams include those in "dead" branches. The root of a dead branch is the first node where isLatest is not equal to 1. A node is a Stream

Stream Latest Tree: The tree, with the root being a top level stream, of all streams which are in the latest path.

Task Forest: All Stream Trees for a given root-level task.

Root Stream: The top level stream for a given child, grand child, etc... stream.


  • No labels

12 Comments

  1. Possible partitioning scheme for stream:

    drop table stream_test;
    drop table task_test;
    CREATE TABLE task_test (
        task NUMBER NOT NULL, 
        tasktype VARCHAR(20 CHAR) NOT NULL, 
        taskname VARCHAR(30 CHAR) NOT NULL, 
        version NUMBER NOT NULL, 
        revision NUMBER NOT NULL, 
        parenttask NUMBER, 
        roottask NUMBER, 
        taskstatus VARCHAR(20 CHAR) DEFAULT 'ACTIVE', 
        priority NUMBER, 
        lastactive TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, 
        xmlsource CLOB, 
        PRIMARY KEY (task), 
        FOREIGN KEY(parenttask) REFERENCES task_test (task), 
        FOREIGN KEY(roottask) REFERENCES task_test (task), 
        FOREIGN KEY(taskstatus) REFERENCES taskstatus (taskstatus), 
        FOREIGN KEY(tasktype) REFERENCES tasktype (tasktype)
    )
    PARTITION BY HASH(roottask) PARTITIONS 128;
    
    CREATE TABLE stream_test (
        stream NUMBER NOT NULL, 
        task NUMBER NOT NULL, 
        parentstream NUMBER, 
        streamid NUMBER NOT NULL, 
        executionnumber NUMBER NOT NULL, 
        islatest NUMBER NOT NULL, 
        streamstatus VARCHAR(20 CHAR) NOT NULL, 
        createdate TIMESTAMP WITH TIME ZONE, 
        startdate TIMESTAMP WITH TIME ZONE, 
        enddate TIMESTAMP WITH TIME ZONE, 
        PRIMARY KEY (stream), 
        FOREIGN KEY(parentstream) REFERENCES stream_test (stream), 
        FOREIGN KEY(streamstatus) REFERENCES streamstatus (streamstatus), 
        CONSTRAINT fk_stream_task_cons FOREIGN KEY(task) REFERENCES task_test (task)
    )
    PARTITION BY REFERENCE(fk_stream_task_cons);
    
    
  2. Partitioning that way limits the ability to subpartition by date for any children. That might be irrelevant if indexes are created appropriately.

    It would take an estimated 1 hour to build a new stream table with all ancestor streams populated.

  3. The following will partition Task, Stream, and ProcessInstance by a root-level task. It requires a root-level task.

    drop table processinstance_test;
    drop table stream_test;
    drop table task_test;
    
    
    CREATE TABLE task_test (
    task NUMBER NOT NULL, 
    tasktype VARCHAR(20 CHAR) NOT NULL, 
    taskname VARCHAR(30 CHAR) NOT NULL, 
    version NUMBER NOT NULL, 
    revision NUMBER NOT NULL, 
    parenttask NUMBER, 
    roottask NUMBER, 
    taskstatus VARCHAR(20 CHAR) DEFAULT 'ACTIVE', 
    priority NUMBER, 
    lastactive TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, 
    xmlsource CLOB, 
    PRIMARY KEY (task), 
    FOREIGN KEY(parenttask) REFERENCES task_test (task), 
    FOREIGN KEY(roottask) REFERENCES task_test (task), 
    FOREIGN KEY(taskstatus) REFERENCES taskstatus (taskstatus), 
    FOREIGN KEY(tasktype) REFERENCES tasktype (tasktype)
    )
    PARTITION BY HASH(roottask) PARTITIONS 512;
    
    CREATE TABLE stream_test (
    stream NUMBER NOT NULL, 
    task NUMBER NOT NULL, 
    parentstream NUMBER, 
    rootstream NUMBER,
    streamid NUMBER NOT NULL, 
    executionnumber NUMBER NOT NULL, 
    islatest NUMBER NOT NULL, 
    streamstatus VARCHAR(20 CHAR) NOT NULL, 
    createdate TIMESTAMP WITH TIME ZONE, 
    startdate TIMESTAMP WITH TIME ZONE, 
    enddate TIMESTAMP WITH TIME ZONE, 
    PRIMARY KEY (stream), 
    FOREIGN KEY(parentstream) REFERENCES stream_test (stream), 
    FOREIGN KEY(streamstatus) REFERENCES streamstatus (streamstatus), 
    CONSTRAINT fk_stream_task FOREIGN KEY(task) REFERENCES task_test (task)
    )
    PARTITION BY REFERENCE(fk_stream_task);
     
    CREATE TABLE processinstance_test (
    processinstance NUMBER NOT NULL, 
    process NUMBER NOT NULL, 
    stream NUMBER NOT NULL, 
    processingstatus VARCHAR(20 CHAR) NOT NULL, 
    createdate TIMESTAMP WITH TIME ZONE NOT NULL, 
    readydate TIMESTAMP WITH TIME ZONE, 
    queuedate TIMESTAMP WITH TIME ZONE, 
    submitdate TIMESTAMP WITH TIME ZONE, 
    startdate TIMESTAMP WITH TIME ZONE, 
    enddate TIMESTAMP WITH TIME ZONE, 
    cpusecondsused NUMBER, 
    memoryused NUMBER, 
    swapused NUMBER, 
    executionhost VARCHAR(30 CHAR), 
    exitcode NUMBER, 
    workingdir VARCHAR(256 CHAR), 
    logfile VARCHAR(256 CHAR), 
    executionnumber NUMBER NOT NULL, 
    autoretrynumber NUMBER DEFAULT '0 ' NOT NULL, 
    islatest NUMBER NOT NULL, 
    jobid VARCHAR(30 CHAR), 
    jobsite VARCHAR(10 CHAR), 
    PRIMARY KEY (processinstance), 
    FOREIGN KEY(jobsite) REFERENCES jobsite (jobsite), 
    FOREIGN KEY(process) REFERENCES process (process), 
    FOREIGN KEY(processingstatus) REFERENCES processingstatus (processingstatus), 
    constraint fk_pi_stream FOREIGN KEY(stream) REFERENCES stream_test (stream)
    )
    partition by reference(fk_pi_stream);
  4. I've encountered problems when we don't use nologging and we don't use the /*+ append */ hint with the redo logs when copying tables over, so we need to remember to use those.

    Secondly, some of the calculations could be precalculated and added to a new tablespace with GRANT SELECT privileges on zglast-oracle03, and we could do a join to help things move quicker. Testing the following was pretty slow on sca-oracle01 with ~12 million streams and ~15 million process instances:

     

    alter table batchprocessinstance_test nologging;
    alter table processinstance_test nologging;
    alter table stream_test nologging;
    alter table task_test nologging;
    
    drop table batchprocessinstance_test;
    drop table processinstance_test;
    drop table stream_test;
    drop table process_test;
    drop table task_test;
    
    CREATE TABLE task_test (
    task NUMBER NOT NULL, 
    tasktype VARCHAR(20 CHAR) NOT NULL, 
    taskname VARCHAR(30 CHAR) NOT NULL, 
    version NUMBER NOT NULL, 
    revision NUMBER NOT NULL, 
    parenttask NUMBER, 
    roottask NUMBER, 
    taskstatus VARCHAR(20 CHAR) DEFAULT 'ACTIVE', 
    priority NUMBER, 
    lastactive TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, 
    xmlsource CLOB, 
    PRIMARY KEY (task), 
    FOREIGN KEY(parenttask) REFERENCES task_test (task), 
    FOREIGN KEY(roottask) REFERENCES task_test (task)--, 
    --FOREIGN KEY(taskstatus) REFERENCES taskstatus (taskstatus), 
    --FOREIGN KEY(tasktype) REFERENCES tasktype (tasktype)
    )
    PARTITION BY HASH(roottask) PARTITIONS 512;
    CREATE TABLE stream_test (
    stream NUMBER NOT NULL, 
    task NUMBER NOT NULL, 
    parentstream NUMBER, 
    rootstream NUMBER,
    streamid NUMBER NOT NULL, 
    executionnumber NUMBER NOT NULL, 
    islatest NUMBER NOT NULL, 
    streamstatus VARCHAR(20 CHAR) NOT NULL, 
    createdate TIMESTAMP WITH TIME ZONE, 
    startdate TIMESTAMP WITH TIME ZONE, 
    enddate TIMESTAMP WITH TIME ZONE, 
    PRIMARY KEY (stream), 
    FOREIGN KEY(parentstream) REFERENCES stream_test (stream), 
    FOREIGN KEY(streamstatus) REFERENCES streamstatus (streamstatus), 
    CONSTRAINT fk_stream_task FOREIGN KEY(task) REFERENCES task_test (task)
    )
    PARTITION BY REFERENCE(fk_stream_task);
    CREATE TABLE processinstance_test (
        processinstance NUMBER NOT NULL, 
        process NUMBER NOT NULL, 
        stream NUMBER NOT NULL, 
        processingstatus VARCHAR(20 CHAR) NOT NULL, 
        createdate TIMESTAMP WITH TIME ZONE NOT NULL, 
        readydate TIMESTAMP WITH TIME ZONE, 
        queuedate TIMESTAMP WITH TIME ZONE, 
        submitdate TIMESTAMP WITH TIME ZONE, 
        startdate TIMESTAMP WITH TIME ZONE, 
        enddate TIMESTAMP WITH TIME ZONE, 
        exitcode NUMBER, 
        executionnumber NUMBER NOT NULL, 
        autoretrynumber NUMBER DEFAULT '0 ' NOT NULL, 
        islatest NUMBER NOT NULL, 
        PRIMARY KEY (processinstance), 
    --    FOREIGN KEY(jobsite) REFERENCES jobsite (jobsite), 
    --    FOREIGN KEY(process) REFERENCES process (process), 
    --    FOREIGN KEY(processingstatus) REFERENCES processingstatus (processingstatus), 
        constraint fk_pi_stream FOREIGN KEY(stream) REFERENCES stream_test (stream)
    )
    partition by reference(fk_pi_stream);
    CREATE TABLE batchprocessinstance_test (
        processinstance NUMBER NOT NULL, 
        jobsite VARCHAR(10 CHAR), 
        jobid VARCHAR(30 CHAR), 
        executionhost VARCHAR(30 CHAR), 
        workingdir VARCHAR(256 CHAR), 
        logfile VARCHAR(256 CHAR), 
        cpusecondsused NUMBER, 
        memoryused NUMBER, 
        swapused NUMBER, 
        FOREIGN KEY(processinstance) REFERENCES processinstance_test (processinstance)
    );
    
    alter table batchprocessinstance_test nologging;
    alter table processinstance_test nologging;
    alter table stream_test nologging;
    alter table task_test nologging;
     
    INSERT /*+ append */ into TASK_TEST (
            select t.task, t.tasktype, t.taskname, t.version, t.revision, t.parenttask, 
                    root_task_of(t.task), t.taskstatus, t.priority, t.lastactive, t.xmlsource 
            from DPF_PIPELINE.task t);
    
    CREATE TABLE PROCESS_TEST NOLOGGING AS SELECT * FROM DPF_PIPELINE.PROCESS;
    
    INSERT /*+ append */ INTO STREAM_TEST (select s.stream, s.task, s.parentstream, root_stream_of(s.stream), 
                                    s.streamid, s.executionnumber, s.islatest, s.streamstatus, 
                                    s.createdate, s.startdate, s.enddate
                                    from DPF_PIPELINE.STREAM s);
    INSERT /*+ append */ INTO PROCESSINSTANCE_TEST
        (SELECT p.processinstance, p.process, p.stream, p.processingstatus, 
            p.createdate, p.readydate, p.queuedate, p.submitdate, p.startdate, p.enddate,
            p.exitcode, p.executionnumber, p.autoretrynumber, p.islatest from DPF_PIPELINE.PROCESSINSTANCE p);
    
    
    INSERT /*+ append */ INTO BATCHPROCESSINSTANCE_TEST
         (SELECT p.processinstance, p.jobsite, p.jobid, p.executionhost,
             p.workingdir, p.logfile, p.cpusecondsused, p.memoryused, p.swapused
             from DPF_PIPELINE.PROCESSINSTANCE p where p.jobid != null NOWAIT);
    
    
    --INSERT /*+ append */ INTO PROCESS_TEST (SELECT p.* from DPF_PIPELINE.PROCESS);
    
    
    --CREATE INDEX idx_fk_taskt_parenttask ON task_test (parenttask) nologging;
    --CREATE INDEX idx_fk_taskt_taskstatus ON task_test (taskstatus) nologging;
    --CREATE INDEX idx_fk_taskt_tasktype ON task_test (tasktype) nologging;
    --CREATE INDEX idx_taskt_taskname ON task_test (taskname) nologging;
    --CREATE UNIQUE INDEX unq_taskt ON task_test (taskname, version, revision, parenttask) nologging;
    --CREATE INDEX idx_fk_stream_test_strmstat ON stream_test (streamstatus) nologging;
    --CREATE INDEX idx_fk_stream_test_parentstrm ON stream_test (parentstream) nologging;
    --CREATE INDEX idx_fk_stream_test_rootstrm ON stream_test (rootstream) nologging;
    --CREATE INDEX idx_fk_stream_test_task ON stream_test (task) nologging;
    --CREATE INDEX idx_stream_test_ptaskltst ON stream_test (parentstream, task, islatest) nologging;
    --CREATE UNIQUE INDEX unq_stream ON stream_test (task, parentstream, streamid, executionnumber) nologging;
    --CREATE INDEX idx_fk_pi_test_process ON processinstance_test (process) nologging;
    --CREATE INDEX idx_fk_pi_test_prcsngstat ON processinstance_test (processingstatus) nologging;
    --CREATE INDEX idx_fk_pi_test_stream ON processinstance_test (stream) nologging;
    --CREATE INDEX idx_pi_test_strmprocltst ON processinstance_test (stream, process, islatest) nologging;
    --CREATE UNIQUE INDEX unq_pi_test ON processinstance_test (process, stream, executionnumber, autoretrynumber) nologging;
    --CREATE INDEX idx_bpi_test_workingdir ON batchprocessinstance (workingdir)
    --CREATE INDEX idx_fk_bpi_test_jobsite ON batchprocessinstance (jobsite)
    
    

     

    800 seconds to calculate root_stream_of 13 million rows for stream table.

  5.  

    Split of Processinstance, insert into processinstance_test table concurrently with batchprocessinstance

    20:30:00 [INSERT - 16055625 row(s), 268.125 secs] Command processed
    ... 4 statement(s) executed, 16055625 row(s) affected, exec/fetch time: 268.371/0.000 sec [1 successful, 3 warnings, 0 errors]

     

    20:28:14 [INSERT - 14009767 row(s), 71.928 secs] Command processed
    ... 1 statement(s) executed, 14009767 row(s) affected, exec/fetch time: 71.928/0.000 sec [1 successful, 0 warnings, 0 errors]

     

  6. Local indexes created for several tablespaces. 

    3x Speedup with the following simulated front page SQL:

     SELECT   SUM(1) "ALL", SUM(CASE WHEN PROCESSINGSTATUS='WAITING'   THEN 1 ELSE 0 END) "WAITING", SUM
        (CASE                       WHEN PROCESSINGSTATUS='READY'     THEN 1 ELSE 0 END) "READY", SUM
        (CASE                       WHEN PROCESSINGSTATUS='QUEUED'    THEN 1 ELSE 0 END) "QUEUED", SUM
        (CASE                       WHEN PROCESSINGSTATUS='SUBMITTED'  THEN 1 ELSE 0 END) "SUBMITTED",
        SUM(CASE                    WHEN PROCESSINGSTATUS='RUNNING'    THEN 1 ELSE 0 END) "RUNNING", SUM
        (CASE                       WHEN PROCESSINGSTATUS='SUCCESS'    THEN 1 ELSE 0 END) "SUCCESS", SUM
        (CASE                       WHEN PROCESSINGSTATUS='FAILED'     THEN 1 ELSE 0 END) "FAILED", SUM
        (CASE                       WHEN PROCESSINGSTATUS='TERMINATED' THEN 1 ELSE 0 END) "TERMINATED",
        SUM(CASE                    WHEN PROCESSINGSTATUS='CANCELING'  THEN 1 ELSE 0 END) "CANCELING",
        SUM(CASE                    WHEN PROCESSINGSTATUS='CANCELED'   THEN 1 ELSE 0 END) "CANCELED",
        SUM(CASE                    WHEN PROCESSINGSTATUS='SKIPPED'    THEN 1 ELSE 0 END) "SKIPPED",
        tt.lev, lpad(' ',1+24*(lev -1),' ')||tt.taskname taskname, tt.task, tt.version || '.' || tt.revision AS
        version, Initcap(pt.ProcessType) type, pt.processname, pt.process, displayorder
      FROM 
        ( SELECT   task,taskname,version,revision,level lev
            FROM TASK_TEST where roottask = 24121043 START WITH Task=24121043 CONNECT BY prior Task = ParentTask ) tt
      JOIN stream_test ss on (ss.task = tt.task)
      JOIN (select process, processingstatus, stream 
                    from PROCESSINSTANCE_TEST 
                    where islatest = 1 and stream in 
                            (select stream from stream_test where task in (select task from task_test where roottask = 24121043))) pit on (pit.stream = ss.stream)
      JOIN process_test pt ON (PIT.PROCESS = pt.PROCESS)
      WHERE ss.isLatest=1
      GROUP BY tt.lev, tt.task, tt.taskname, tt.version, tt.revision, pt.process, pt.PROCESSNAME, pt.displayorder, pt.processtype
      ORDER BY tt.task, pt.process

     

    It doesn't seem like Oracle isn't always smart enough to understand the partition across joins, so that's why I did the (select * from PROCESSINSTANCE_TEST where stream....) because that ultimately references the partition from the roottask, then one stream partition, then one processinstance partition (hopefully). 

  7. Creating a partitioned index for the stream table on the rootstream makes the index scanning very quick.

    For example, we can find all latest child streams really quickly for a given root stream:

    CREATE INDEX idx_stream_t_rootstrm ON stream_test (rootstream, islatest) GLOBAL PARTITION BY HASH (ROOTSTREAM) PARTITIONS 128 NOLOGGING;

  8. Modified the Pipeline web interface to use a new Common Table Expression (CTE) based recursion. Performance is roughly equivalent with the previous query, in some cases much better and in a few cases ~1.3x slower when tested with SRS queries. In addition to that, this query actually improves in that it catches an edge case that wasn't being properly handled with the previous query.

    Some additional benefits may come in the future when partitioning is enabled.

       WITH 
           task_tree (task, parenttask, taskname, version, revision) AS 
           ( SELECT task, parenttask, taskname, version, revision FROM task WHERE task = 129203463
             UNION ALL
             SELECT jt.task, jt.parenttask, jt.taskname, jt.version, jt.revision
               FROM task_tree tt
               JOIN task jt ON (tt.task = jt.parenttask)
           ),
           sst AS (SELECT stream, parentstream, streamstatus, task, islatest 
                     FROM stream WHERE task IN (select task from task_tree)), 
           stream_tree ( stream, parentstream, streamstatus, task, lev ) AS
           ( SELECT   stream, parentstream, streamstatus, task, 1
               FROM stream
               WHERE task = (select task from task_tree where parenttask = 0) AND islatest = 1
             UNION ALL
             SELECT   sst.stream, sst.parentstream, sst.streamstatus, sst.task, lev+1
               FROM stream_tree st
               JOIN sst ON (st.stream = sst.parentstream)
               WHERE islatest = 1
           )
       select   SUM(1) "ALL",
           SUM(case when PROCESSINGSTATUS='WAITING' then 1 else 0 end) "WAITING",                        
           SUM(case when PROCESSINGSTATUS='READY' then 1 else 0 end) "READY",                        
           SUM(case when PROCESSINGSTATUS='QUEUED' then 1 else 0 end) "QUEUED",                        
           SUM(case when PROCESSINGSTATUS='SUBMITTED' then 1 else 0 end) "SUBMITTED",                        
           SUM(case when PROCESSINGSTATUS='RUNNING' then 1 else 0 end) "RUNNING",                        
           SUM(case when PROCESSINGSTATUS='SUCCESS' then 1 else 0 end) "SUCCESS",                        
           SUM(case when PROCESSINGSTATUS='FAILED' then 1 else 0 end) "FAILED",                        
           SUM(case when PROCESSINGSTATUS='TERMINATED' then 1 else 0 end) "TERMINATED",                        
           SUM(case when PROCESSINGSTATUS='CANCELING' then 1 else 0 end) "CANCELING",                        
           SUM(case when PROCESSINGSTATUS='CANCELED' then 1 else 0 end) "CANCELED",                        
           SUM(case when PROCESSINGSTATUS='SKIPPED' then 1 else 0 end) "SKIPPED",                        
           lev, lpad(' ',1+24*(lev -1),' ')|| tt.taskname  taskname, st.task, tt.version || '.' || tt.revision as version, Initcap(prt.ProcessType) type, prt.processname, pt.process, prt.displayorder
           FROM stream_tree st
           join processinstance pt on (pt.stream = st.stream)
           join task tt on (st.task = tt.task)
           join process prt on (pt.process = prt.process)
           where pt.islatest = 1
           GROUP BY lev, st.task, tt.taskname, tt.version, tt.revision, pt.process, prt.PROCESSNAME, prt.displayorder, prt.processtype
           ORDER BY st.task, pt.process
    
    
  9. It's been decided the potential benefits of REFERENCE'd partitioning are hard to materialize properly. I've switched to ROOTSTREAM based partitioning which should provide us with more runtime benefits, hopefully with some improvement in the web application, notable that task.jsp page.

    Of note, three things are known to be of issue:

    1. Not enough tablespace, so we must create several new tablespace files immediately upon migrating.

    2. Possible problems with the ROOT_STREAM_OF function

    3. Permissions with CREATE TRIGGER

     

     

    The following is a candidate script for the glast_dp_test migration:

     

    -- !! IMPORTANT: Fix any functions for ROOT_TASK_OF, ROOT_STREAM_OF
    -- !! IMPORTANT: Add a few more database files if necessary:
    --ALTER TABLESPACE 
    --   glast_dp_test 
    --ADD DATAFILE 
    --   ‘/ora01/oracle/oradata/booktst_users_02.dbf’ 
    --size 100m
    
    ALTER TABLE batchprocessinstance_test NOLOGGING;
    ALTER TABLE processinstance_test NOLOGGING;
    ALTER TABLE stream_test NOLOGGING;
    DROP TABLE batchprocessinstance_test;
    DROP TABLE processinstance_test;
    DROP TABLE stream_test;
    CREATE TABLE stream_test (
        stream NUMBER NOT NULL, 
        task NUMBER NOT NULL, 
        parentstream NUMBER, 
        rootstream NUMBER,
        streamid NUMBER NOT NULL, 
        executionnumber NUMBER NOT NULL, 
        islatest NUMBER NOT NULL, 
        streamstatus VARCHAR(20 CHAR) NOT NULL, 
        createdate TIMESTAMP WITH TIME ZONE,
        startdate TIMESTAMP WITH TIME ZONE,
        enddate TIMESTAMP WITH TIME ZONE,
        PRIMARY KEY (stream)--, 
    --    FOREIGN KEY(parentstream) REFERENCES stream_test (stream), 
    --    FOREIGN KEY(streamstatus) REFERENCES streamstatus (streamstatus)
    --    CONSTRAINT fk_stream_t_task FOREIGN KEY(task) REFERENCES task(task)
    )
    PARTITION BY HASH(rootstream) PARTITIONS 512;
    CREATE TABLE processinstance_test (
        processinstance NUMBER NOT NULL, 
        process NUMBER NOT NULL, 
        stream NUMBER NOT NULL, 
        processingstatus VARCHAR(20 CHAR) NOT NULL, 
        createdate TIMESTAMP WITH TIME ZONE NOT NULL, 
        readydate TIMESTAMP WITH TIME ZONE,
        queuedate TIMESTAMP WITH TIME ZONE,
        submitdate TIMESTAMP WITH TIME ZONE,
        startdate TIMESTAMP WITH TIME ZONE,
        enddate TIMESTAMP WITH TIME ZONE,
        exitcode NUMBER, 
        jobsite VARCHAR(10 CHAR), 
        jobid VARCHAR(30 CHAR), 
        executionnumber NUMBER NOT NULL, 
        autoretrynumber NUMBER DEFAULT '0' NOT NULL, 
        islatest NUMBER NOT NULL, 
        PRIMARY KEY (processinstance), 
    --    FOREIGN KEY(process) REFERENCES process_test (process), 
    --    FOREIGN KEY(processingstatus) REFERENCES processingstatus (processingstatus), 
    --    FOREIGN KEY(jobsite) REFERENCES jobsite (jobsite), 
        constraint fk_pi_t_stream FOREIGN KEY(stream) REFERENCES stream_test (stream)
    )
    PARTITION BY REFERENCE(fk_pi_t_stream);
    CREATE TABLE batchprocessinstance_test (
        processinstance NUMBER NOT NULL, 
        executionhost VARCHAR(30 CHAR), 
        workingdir VARCHAR(256 CHAR), 
        logfile VARCHAR(256 CHAR), 
        cpusecondsused NUMBER, 
        memoryused NUMBER, 
        swapused NUMBER--, 
    --    FOREIGN KEY(processinstance) REFERENCES processinstance_test (processinstance)
    );
    ALTER TABLE batchprocessinstance_test NOLOGGING;
    ALTER TABLE processinstance_test NOLOGGING;
    ALTER TABLE stream_test NOLOGGING;
    -- !! IMPORTANT: Run these in parallel! They aren't likely to overlap partitions too much.
    -- Allow 4000 seconds for the following:
    INSERT /*+ append */ INTO STREAM_TEST (select s.stream, s.task, s.parentstream, root_stream_of(s.stream), 
                                    s.streamid, s.executionnumber, s.islatest, s.streamstatus, 
                                    s.createdate, s.startdate, s.enddate
                                   from /*user*/STREAM s where s.task < 23126920);
    INSERT /*+ append */ INTO STREAM_TEST (select s.stream, s.task, s.parentstream, root_stream_of(s.stream), 
                                    s.streamid, s.executionnumber, s.islatest, s.streamstatus, 
                                    s.createdate, s.startdate, s.enddate
                                   from /*user*/STREAM s where s.task >= 23126920 and s.task < 58802259);
    INSERT /*+ append */ INTO STREAM_TEST (select s.stream, s.task, s.parentstream, root_stream_of(s.stream), 
                                    s.streamid, s.executionnumber, s.islatest, s.streamstatus, 
                                    s.createdate, s.startdate, s.enddate
                                   from /*user*/STREAM s where s.task >= 58802259 and s.task < 83665816);
    INSERT /*+ append */ INTO STREAM_TEST (select s.stream, s.task, s.parentstream, root_stream_of(s.stream), 
                                    s.streamid, s.executionnumber, s.islatest, s.streamstatus, 
                                    s.createdate, s.startdate, s.enddate
                                   from /*user*/STREAM s where s.task >= 83665816 and s.task < 100892797);
    INSERT /*+ append */ INTO STREAM_TEST (select s.stream, s.task, s.parentstream, root_stream_of(s.stream), 
                                    s.streamid, s.executionnumber, s.islatest, s.streamstatus, 
                                    s.createdate, s.startdate, s.enddate
                                   from /*user*/STREAM s where s.task >= 100892797);
    -- Use System parallelization
    ALTER SESSION ENABLE PARALLEL DML;
    -- Allow 8 minutes for the following
    -- This should take <8M disk reads.
    INSERT /*+ append parallel(20) */ INTO PROCESSINSTANCE_TEST
        (SELECT p.processinstance, p.process, p.stream, p.processingstatus, 
            p.createdate, p.readydate, p.queuedate, p.submitdate, p.startdate, p.enddate,
            p.exitcode, p.jobsite, p.jobid, p.executionnumber, p.autoretrynumber, p.islatest from /*user*/PROCESSINSTANCE p);
    INSERT /*+ append parallel(20) */ INTO BATCHPROCESSINSTANCE_TEST
         (SELECT p.processinstance, p.executionhost,
             p.workingdir, p.logfile, p.cpusecondsused, p.memoryused, p.swapused
             from /*user*/PROCESSINSTANCE p where p.jobid is not null);
    -- !!IMPORTANT - Create new Triggers
    ALTER TABLE PROCESSINSTANCE RENAME TO PROCESSINSTANCE_OLD;
    ALTER TABLE STREAM RENAME TO STREAM_OLD;
    
    ALTER TABLE STREAM_OLD RENAME CONSTRAINT "BIN$bJSVSlXrJoHgRAAUT5fcQA==$0" TO "PK_STREAM_OLD";
    ALTER TABLE PROCESSINSTANCE_OLD RENAME CONSTRAINT "BIN$bJSVSlW7JoHgRAAUT5fcQA==$0" TO "PK_PI_OLD";
    ALTER TRIGGER STREAM_PK RENAME TO STREAM_PK_OLD;
    ALTER TRIGGER PROCESSINST_PK RENAME TO PROCESSINST_PK_OLD;
    ALTER TABLE BATCHPROCESSINSTANCE_TEST RENAME TO BATCHPROCESSINSTANCE;
    ALTER TABLE PROCESSINSTANCE_TEST RENAME TO PROCESSINSTANCE;
    ALTER TABLE STREAM_TEST RENAME TO STREAM;
    /*
    -- REVERT
    ALTER TABLE STREAM_OLD RENAME CONSTRAINT "PK_STREAM_OLD" TO "BIN$bJSVSlXrJoHgRAAUT5fcQA==$0";
    ALTER TABLE PROCESSINSTANCE_OLD RENAME CONSTRAINT "PK_PI_OLD" TO "BIN$bJSVSlW7JoHgRAAUT5fcQA==$0";
    ALTER TRIGGER STREAM_PK_OLD RENAME TO STREAM_PK;
    ALTER TRIGGER PROCESSINST_PK_OLD RENAME TO PROCESSINST_PK;
    ALTER TABLE BATCHPROCESSINSTANCE RENAME TO BATCHPROCESSINSTANCE_TEST;
    ALTER TABLE PROCESSINSTANCE RENAME TO PROCESSINSTANCE_TEST;
    ALTER TABLE STREAM RENAME TO STREAM_TEST;
    ALTER TABLE PROCESSINSTANCE_OLD RENAME TO PROCESSINSTANCE;
    ALTER TABLE STREAM_OLD RENAME TO STREAM;
    */
    /*
    -- !!IMPORTANT - Create these!
    create or replace trigger Stream_pk
      before insert
        on stream
        for each row
    begin
      select pipeline_seq.nextval
         into :new.stream
         from dual;
      :new.ROOTSTREAM := CASE WHEN :new.ROOTSTREAM = 0 THEN :new.STREAM ELSE :new.ROOTSTREAM END;
    end;
    /
    create or replace trigger Processinst_pk
      before insert
        on processinstance
        for each row
    begin
      select pipeline_seq.nextval
        into :new.processinstance
        from dual;
      :new.createdate:= sysdate;
    end;
    */
    -- Rename old constraints
    ALTER TABLE "STREAMFILEREF" RENAME CONSTRAINT "FK_SF_STREAM" TO "FK_SF_STREAM_OLD";
    ALTER TABLE "STREAMVAR" RENAME CONSTRAINT "FK_STREAMIVARPI" TO "FK_STREAMIVARPI_OLD";
    ALTER TABLE "DATASETVERSION" RENAME CONSTRAINT "FK_DSV_PI" TO "FK_DSV_PI_OLD";
    ALTER TABLE "LOG" RENAME CONSTRAINT "FK_LOG_PI" TO "FK_LOG_PI_OLD";
    ALTER TABLE "PROCESSINSTANCEFILEREF" RENAME CONSTRAINT "FK_PIF_PROCESSINSTANCE" TO "FK_PIF_PROCESSINSTANCE_OLD";
    ALTER TABLE "PROCESSINSTANCEVAR" RENAME CONSTRAINT "FK_PIVARPI" TO "FK_PIVARPI_OLD";
    ALTER TABLE "HISTORYRUNS" RENAME CONSTRAINT "FK_PROCESSINSTANCE" TO "FK_PROCESSINSTANCE_OLD";
    -- Disable old constraints
    ALTER TABLE "STREAMFILEREF" DISABLE CONSTRAINT "FK_SF_STREAM_OLD";
    ALTER TABLE "STREAMVAR" DISABLE CONSTRAINT "FK_STREAMIVARPI_OLD";
    ALTER TABLE "DATASETVERSION" DISABLE CONSTRAINT "FK_DSV_PI_OLD";
    ALTER TABLE "LOG" DISABLE CONSTRAINT "FK_LOG_PI_OLD";
    ALTER TABLE "PROCESSINSTANCEFILEREF" DISABLE CONSTRAINT "FK_PIF_PROCESSINSTANCE_OLD";
    ALTER TABLE "PROCESSINSTANCEVAR" DISABLE CONSTRAINT "FK_PIVARPI_OLD";
    ALTER TABLE "HISTORYRUNS" DISABLE CONSTRAINT "FK_PROCESSINSTANCE_OLD";
    -- Create new constraints
    ALTER TABLE "STREAMFILEREF" ADD CONSTRAINT "FK_SF_STREAM" FOREIGN KEY (STREAM) REFERENCES STREAM(STREAM) ON DELETE CASCADE ENABLE NOVALIDATE;
    ALTER TABLE "STREAMVAR" ADD CONSTRAINT "FK_STREAMIVARPI" FOREIGN KEY (STREAM) REFERENCES STREAM(STREAM) ON DELETE CASCADE ENABLE NOVALIDATE;
    ALTER TABLE "DATASETVERSION" ADD CONSTRAINT "FK_DSV_PI" FOREIGN KEY (PROCESSINSTANCE) REFERENCES PROCESSINSTANCE(PROCESSINSTANCE) ON DELETE SET NULL ENABLE NOVALIDATE;
    ALTER TABLE "LOG" ADD CONSTRAINT "FK_LOG_PI" FOREIGN KEY (PROCESSINSTANCE) REFERENCES PROCESSINSTANCE(PROCESSINSTANCE) ON DELETE SET NULL ENABLE NOVALIDATE;
    ALTER TABLE "PROCESSINSTANCEFILEREF" ADD CONSTRAINT "FK_PIF_PROCESSINSTANCE" FOREIGN KEY (PROCESSINSTANCE) REFERENCES PROCESSINSTANCE(PROCESSINSTANCE) ON DELETE CASCADE ENABLE NOVALIDATE;
    ALTER TABLE "PROCESSINSTANCEVAR" ADD CONSTRAINT "FK_PIVARPI" FOREIGN KEY (PROCESSINSTANCE) REFERENCES PROCESSINSTANCE(PROCESSINSTANCE) ON DELETE CASCADE ENABLE NOVALIDATE;
    ALTER TABLE "HISTORYRUNS" ADD CONSTRAINT "FK_PROCESSINSTANCE" FOREIGN KEY (PROCESSINSTANCE) REFERENCES PROCESSINSTANCE(PROCESSINSTANCE) ON DELETE CASCADE ENABLE NOVALIDATE;
    -- Don't forget adding the constraints we omitted for the table copy
    ALTER TABLE STREAM ADD CONSTRAINT FK_STREAM_T_TASK FOREIGN KEY(TASK) REFERENCES TASK(TASK) ON DELETE CASCADE ENABLE NOVALIDATE;
    ALTER TABLE STREAM ADD FOREIGN KEY(PARENTSTREAM) REFERENCES STREAM(STREAM) ENABLE NOVALIDATE;
    ALTER TABLE STREAM ADD FOREIGN KEY(ROOTSTREAM) REFERENCES STREAM(STREAM) ENABLE NOVALIDATE;
    ALTER TABLE STREAM ADD FOREIGN KEY(STREAMSTATUS) REFERENCES STREAMSTATUS(STREAMSTATUS) ENABLE NOVALIDATE;
    ALTER TABLE PROCESSINSTANCE ADD FOREIGN KEY(PROCESS) REFERENCES PROCESS(PROCESS) ENABLE NOVALIDATE; -- 120 seconds
    ALTER TABLE PROCESSINSTANCE ADD FOREIGN KEY(PROCESSINGSTATUS) REFERENCES PROCESSINGSTATUS(PROCESSINGSTATUS) ENABLE NOVALIDATE;
    ALTER TABLE BATCHPROCESSINSTANCE ADD CONSTRAINT "FK_BATCH_PI" FOREIGN KEY(PROCESSININSTANCE) REFERENCES PROCESSINSTANCE(PROCESSINSTANCE) ENABLE NOVALIDATE; --120 seconds
    
    -- Drop old constraints
    --ALTER TABLE "STREAMFILEREF" DROP CONSTRAINT "FK_SF_STREAM_OLD";
    --ALTER TABLE "STREAMVAR" DROP CONSTRAINT "FK_STREAMIVARPI_OLD";
    --ALTER TABLE "DATASETVERSION" DROP CONSTRAINT "FK_DSV_PI_OLD";
    --ALTER TABLE "LOG" DROP CONSTRAINT "FK_LOG_PI_OLD";
    --ALTER TABLE "PROCESSINSTANCEFILEREF" DROP CONSTRAINT "FK_PIF_PROCESSINSTANCE_OLD";
    --ALTER TABLE "PROCESSINSTANCEVAR" DROP CONSTRAINT "FK_PIVARPI_OLD";
    --ALTER TABLE "HISTORYRUNS" DROP CONSTRAINT "FK_PROCESSINSTANCE_OLD";
    -- The following should take under 10 minutes
    /*
    CREATE INDEX idx_fk_stream_t_strmstat ON stream (streamstatus) NOLOGGING; -- GLOBAL PARTITION BY HASH(streamstatus) PARTITIONS 128 NOLOGGING;
    CREATE INDEX idx_fk_stream_t_parentstrm ON stream (parentstream) NOLOGGING;
    CREATE INDEX idx_fk_stream_t_task ON stream (task) NOLOGGING;
    CREATE INDEX idx_stream_t_rtstrm ON stream (rootstream, islatest) LOCAL NOLOGGING; -- For just getting everything -- GLOBAL PARTITION BY HASH (ROOTSTREAM) PARTITIONS 128 NOLOGGING; 
    CREATE INDEX idx_stream_t_rtparltst ON stream (rootstream, parentstream, islatest) LOCAL NOLOGGING; -- For recursive queries -- GLOBAL PARTITION BY HASH (ROOTSTREAM) PARTITIONS 128 NOLOGGING; 
    CREATE INDEX idx_stream_t_partaskltst ON stream (parentstream, task, islatest) NOLOGGING;
    CREATE UNIQUE INDEX unq_stream_t ON stream (task, parentstream, streamid, executionnumber) NOLOGGING;
    CREATE INDEX idx_fk_pi_t_process ON processinstance (process) NOLOGGING;
    CREATE INDEX idx_fk_pi_t_prcsngstat ON processinstance (processingstatus) NOLOGGING;
    CREATE INDEX idx_fk_pi_t_stream ON processinstance (stream) NOLOGGING;
    CREATE INDEX idx_pi_t_strmprocltst ON processinstance (stream, process, islatest) NOLOGGING;
    CREATE UNIQUE INDEX unq_pi_t ON processinstance (process, stream, executionnumber, autoretrynumber) NOLOGGING;
    CREATE INDEX idx_bpi_test_workingdir ON batchprocessinstance (workingdir);
    */
    
    --begin
    --
    --dbms_stats.gather_table_stats( 
    --ownname=> 'SRS_PIPELINE_TEST', 
    --tabname=> 'PROCESSINSTANCE_TEST' , 
    --estimate_percent=> DBMS_STATS.AUTO_SAMPLE_SIZE, 
    --cascade=> DBMS_STATS.AUTO_CASCADE, 
    --degree=> 4, 
    --no_invalidate=> DBMS_STATS.AUTO_INVALIDATE, 
    --granularity=> 'AUTO', 
    --method_opt=> 'FOR ALL COLUMNS SIZE AUTO');
    --
    --end;
    
    ALTER TABLE batchprocessinstance LOGGING;
    ALTER TABLE processinstance LOGGING;
    ALTER TABLE stream LOGGING;
    
    
  10. Query for determining the streams which are no longer in the latest path

    SELECT STREAM, PARENTSTREAM, ISLATEST, CASE WHEN INSTR( SYS_CONNECT_BY_PATH(islatest, '.'), '0') > 0 THEN 0 ELSE 1 END latestpath, LEVEL
     FROM STREAM
     CONNECT BY PRIOR STREAM = PARENTSTREAM

     

    Determining the actual streams that need to be updated (or in this case, the count):

    select count(stream) from (
    SELECT STREAM, PARENTSTREAM, ISLATEST, CASE WHEN INSTR( SYS_CONNECT_BY_PATH(islatest, '.'), '0') > 0 THEN 0 ELSE 1 END latestpath, LEVEL
     FROM STREAM
     CONNECT BY PRIOR STREAM = PARENTSTREAM
     START WITH parentstream = 0
    ) where ISLATEST = 1 and LATESTPATH = 0;

     

     

  11. Notes on my procedures:

    1. mv monitor, stop pipeline.

    2. ~bvan/download-maven-deps.sh -a srs:org-srs-pipeline-server -v 1.5-SNAPSHOT
    3. ~bvan/download-maven-deps.sh -a srs:org-srs-pipeline-client -v 1.2-SNAPSHOT
    -- copy group manager 1.21
    -- copy sqlj-1.2.jar from another directory
    4. wget maven projects
    5. modify run-pipeline-1.5.csh
    -- add dependency for sqlj-1.2.csh in plugin path
    6. modify pipeline executable script
    -- Check directories
    7. perform SQL migration

    8. run islatest migration completion

  12. Should optimize pipeline and web application to get rid of PII.GetStreamIsLatestPath nonsense after move