PROCEDURE SetProcessInstanceStatus(ProcessInstance_in IN int, ProcessingStatus_in IN VARCHAR2, transitionTime_in IN TimeStamp) IS ProcessInstanceRow_out ProcessInstance%ROWTYPE; ProcessType_out ProcessType.ProcessType%TYPE; StreamStatus_out StreamStatus.StreamStatus%TYPE; BEGIN select * into ProcessInstanceRow_out from ProcessInstance where ProcessInstance = ProcessInstance_in; select ProcessType into ProcessType_out from Process where Process = ProcessInstanceRow_out.Process; IF 0 = GetStateTransitionValid(ProcessInstanceRow_out.ProcessingStatus, ProcessingStatus_in, ProcessType_out) THEN RAISE_APPLICATION_ERROR(invalid_status_transition_ex, 'Not a valid state transition from ' || ProcessInstanceRow_out.ProcessingStatus || ' to ' || ProcessingStatus_in || ' for a process of type ' || ProcessType_out || '.'); RETURN; END IF; update ProcessInstance set ProcessingStatus = ProcessingStatus_in where ProcessInstance = ProcessInstance_in; IF ps_submitted = ProcessingStatus_in THEN update ProcessInstance set SubmitDate = transitionTime_in where ProcessInstance = ProcessInstance_in; ELSIF ps_running = ProcessingStatus_in THEN update ProcessInstance set StartDate = transitionTime_in where ProcessInstance = ProcessInstance_in; ELSIF ps_success = ProcessingStatus_in OR ps_failed = ProcessingStatus_in THEN update ProcessInstance set EndDate = transitionTime_in where ProcessInstance = ProcessInstance_in; END IF; CheckProcessDependents(ProcessInstanceRow_out.Process, ProcessInstanceRow_out.Stream, ProcessingStatus_in); StreamStatus_out := SP_STREAM.calculateStreamStatus(ProcessInstanceRow_out.Stream); END;
public void setProcessInstanceStatus(int processInstance, ProcessingStatus newProcessingStatus, Timestamp transitionTime) throws InvalidStatusTransitionException, SQLException { CallableStatement stmt = connection.prepareCall("{call DATA_ACCESS.SetProcessInstanceStatus(?,?,?)}"); try { stmt.setInt(1, processInstance); stmt.setString(2, newProcessingStatus.toString()); stmt.setTimestamp(3, transitionTime); stmt.execute(); } catch(SQLException x) { int errorCode = x.getErrorCode(); if (errorCode == InvalidStatusTransitionException.errorCode) throw new InvalidStatusTransitionException(x); else throw x; } finally { stmt.close(); } }
package org.glast.pipeline.server.sql.sp; public class SPStream { // Package-specific Exception types: public static final int invalid_stream_ex = -20026; // ProcessingStatus Enumeration Constants: public static final String ps_waiting = "WAITING"; public static final String ps_ready = "READY"; public static final String ps_queued = "QUEUED"; public static final String ps_submitted = "SUBMITTED"; public static final String ps_running = "RUNNING"; public static final String ps_success = "SUCCESS"; public static final String ps_failed = "FAILED"; public static final String ps_terminated = "TERMINATED"; public static final String ps_canceled = "CANCELED"; public static final String ps_skipped = "SKIPPED"; // StreamStatus Enumeration Constants: public static final String ss_waiting = "WAITING"; public static final String ss_queued = "QUEUED"; public static final String ss_running = "RUNNING"; public static final String ss_success = "SUCCESS"; public static final String ss_failed = "FAILED"; public static final String ss_terminated = "TERMINATED"; public static final String ss_canceled = "CANCELED"; public static final String ss_terminating = "TERMINATING"; public static final String ss_canceling = "CANCELING"; private static HashSet processingStates = new HashSet(); static { processingStates.add(ps_waiting); processingStates.add(ps_ready); processingStates.add(ps_queued); processingStates.add(ps_submitted); processingStates.add(ps_running); processingStates.add(ps_success); processingStates.add(ps_failed); processingStates.add(ps_terminated); processingStates.add(ps_canceled); processingStates.add(ps_terminated); processingStates.add(ps_skipped); } private static HashSet finalProcessingStates = new HashSet(); static { finalProcessingStates.add(ps_terminated); finalProcessingStates.add(ps_canceled); finalProcessingStates.add(ps_success); finalProcessingStates.add(ps_failed); finalProcessingStates.add(ps_skipped); } private static HashSet streamStates = new HashSet(); static { streamStates.add(ss_waiting); streamStates.add(ss_queued); streamStates.add(ss_running); streamStates.add(ss_success); streamStates.add(ss_failed); streamStates.add(ss_terminating); streamStates.add(ss_terminated); streamStates.add(ss_canceling); streamStates.add(ss_canceled); } private static HashSet finalStreamStates = new HashSet(); static { finalStreamStates.add(ss_terminated); finalStreamStates.add(ss_canceled); finalStreamStates.add(ss_success); finalStreamStates.add(ss_failed); } private static boolean allComplete(Map procStatMap, Map streamStatMap) { for (Iterator i = processingStates.iterator(); i.hasNext();) { String state = (String)i.next(); int count = ((Integer)procStatMap.get(state)).intValue(); if ( !finalProcessingStates.contains(state) && (count != 0) ) return false; } for (Iterator i = streamStates.iterator(); i.hasNext();) { String state = (String)i.next(); int count = ((Integer)streamStatMap.get(state)).intValue(); if ( !finalStreamStates.contains(state) && (count != 0) ) return false; } return true; } private static boolean all(String key, Map map) { int total = 0; for (Iterator i = map.values().iterator(); i.hasNext();) { total += ((Integer)i.next()).intValue(); } return ((Integer)map.get(key)).intValue() == total; } private static boolean any(String key, Map map) { return ((Integer)map.get(key)).intValue() != 0; } public static String calculateStreamStatus(int stream) throws ClassNotFoundException { String retVal = ss_waiting; Connection connection = null; // Database connection object ResultSet rset = null; int parentStream = 0; try { // Get a Default Database Connection using Server Side JDBC Driver. // Note : This class will be loaded on the Database Server and hence use a // Server Side JDBC Driver to get default Connection to Database if (System.getProperty("java.vendor").toLowerCase().indexOf("oracle") != -1) { // we're running as a stored procedure, connect locally: connection = DriverManager.getConnection("jdbc:default:connection"); } else { // we're running outside the DB, connect via standard oracle jdbc driver: Class.forName("oracle.jdbc.driver.OracleDriver"); connection = DriverManager.getConnection("jdbc:oracle:thin:@glast-oracle02.slac.stanford.edu:1521:GLASTDEV","GLAST_DP_TEST","BT33%Q9]MU"); } // // Get information about current stream // String currentStatus; PreparedStatement stmt = connection.prepareStatement( "select * " + "from Stream " + "where Stream = ? " ); stmt.setInt(1,stream); rset = stmt.executeQuery(); // Execute the query, get Resultset if (rset.next()) { currentStatus = rset.getString("STREAMSTATUS"); parentStream = rset.getInt("PARENTSTREAM"); } else { throw new SQLException("No such stream", "", invalid_stream_ex); } stmt.close(); // // Get a count of the Processes in each state for this Stream: // stmt = connection.prepareStatement( "select ProcessingStatus, count(ProcessingStatus) AS COUNT " + "from ProcessInstance " + "where Stream = ? " + "group by ProcessingStatus " ); stmt.setInt(1,stream); rset = stmt.executeQuery(); // Execute the query, get Resultset Map procStatCounts = new HashMap(); for (Iterator i = processingStates.iterator(); i.hasNext();) procStatCounts.put((String)i.next(), new Integer(0)); while (rset.next()) { procStatCounts.put(rset.getString("PROCESSINGSTATUS"), new Integer(rset.getInt("COUNT"))); } stmt.close(); // // Get a count of the sub-Streams in each state for this Stream: // stmt = connection.prepareStatement( "select StreamStatus, count(StreamStatus) AS COUNT " + "from Stream " + "where ParentStream = ? " + "group by StreamStatus" ); stmt.setInt(1, stream); rset = stmt.executeQuery(); Map subStreamStatCounts = new HashMap(); for (Iterator i = streamStates.iterator(); i.hasNext();) subStreamStatCounts.put((String)i.next(), new Integer(0)); while (rset.next()) { subStreamStatCounts.put(rset.getString("STREAMSTATUS"), new Integer(rset.getInt("COUNT"))); } stmt.close(); if (currentStatus == ss_terminating) { if (allComplete(procStatCounts, subStreamStatCounts)) { retVal = ss_terminated; } else { retVal = ss_terminating; } } else if (currentStatus == ss_canceling) { if (allComplete(procStatCounts, subStreamStatCounts)) { retVal = ss_canceled; } else { retVal = ss_canceling; } } else if ( any(ps_running, procStatCounts) || any(ss_running, subStreamStatCounts) ) { retVal = ss_running; } else if ( any(ps_queued, procStatCounts) || any(ps_submitted, procStatCounts) || any(ss_queued, subStreamStatCounts) ) { retVal = ss_queued; } else if ( any(ps_waiting, procStatCounts) || any(ss_waiting, subStreamStatCounts) ) { retVal = ss_waiting; } else if ( any(ps_failed, procStatCounts) || any(ss_failed, subStreamStatCounts) ) { retVal = ss_failed; } else if ( all(ps_success, procStatCounts) && all(ss_success, subStreamStatCounts) ) { retVal = ss_success; } stmt = connection.prepareStatement("update Stream set StreamStatus = ? where Stream = ?"); stmt.setString(1,retVal); stmt.setInt(2,stream); stmt.executeUpdate(); if (parentStream != 0) calculateStreamStatus(parentStream); } catch (SQLException ex) { // Trap SQL Errors ex.printStackTrace(); } finally { try{ if (connection != null || !connection.isClosed()) connection.close(); // Close the database connection } catch(SQLException ex){ ex.printStackTrace(); } } return retVal; } }
Loading Java:
%ORACLE_HOME%\bin\loadjava -user=GLAST_DP_TEST/BT33%%Q9]MU@glast-oracle02.slac.stanford.edu:1521:GLASTDEV -verbose -force -resolve %project_base%\src\main\java\org\glast\pipeline\server\sql\sp\*.java
Dropping Java:
%ORACLE_HOME%\bin\dropjava -user "GLAST_DP_TEST/BT33%Q9]MU@glast-oracle02.slac.stanford.edu:1521:GLASTDEV" -verbose org.glast.pipeline.server.sql.sp.OraEnv
Publishing (previously loaded) Java:
set CLASSPATH=%ORACLE_HOME%\sqlj\lib\translator.jar;%ORACLE_HOME%\sqlj\lib\runtime12.jar;%ORACLE_HOME%\jdbc\lib\ojdbc14.jar;%ORACLE_HOME%\sqlj\lib\utl_dbws.jar;%ORACLE_HOME%\jdbc\lib\orai18n.jar;%ORACLE_HOME%\sqlj\runtime12ee.jar;%ORACLE_HOME%\jpub\lib\jpub.jar;%ORACLE_HOME%\sqlj\lib\sqljutl.jar %ORACLE_HOME%\bin\jpub -user=glast_dp_test/BT33%%Q9]MU -url=jdbc:oracle:thin:@glast-oracle02.slac.stanford.edu:1521:GLASTDEV -java=org.glast.pipeline.server.sql.sp.* -package=org.glast.pipeline.server.sql.spclient -compile=false -dir=%project_base%\src\main\java\
package org.glast.pipeline.server.sql.spclient; public class SPStream { public SPStream(java.sql.Connection conn) throws java.sql.SQLException { m_ctx = new sqlj.runtime.ref.DefaultContext(conn); } // MORE GENERATED CODE HERE <clipped> public java.lang.String calculateStreamStatus(int p0) throws java.lang.ClassNotFoundException { Object __jRt_0 = null; try { __jRt_0 = oracle.jpub.reflect.Client.invoke(_context(),null, "org.glast.pipeline.server.sql.sp.SPStream","calculateStreamStatus","I",new Object[]{new java.lang.Integer(p0)}); } catch (java.lang.ClassNotFoundException e) { throw e; } catch (Throwable e) { e.printStackTrace(); } return (java.lang.String)__jRt_0; } }
CREATE OR REPLACE PACKAGE SP_STREAM AS -- Package-specific types: TYPE CURSOR IS REF CURSOR; -- public static String calculateStreamStatus(int stream) FUNCTION calculateStreamStatus(Stream_in IN number) RETURN varchar2; END SP_STREAM; / CREATE OR REPLACE PACKAGE BODY SP_STREAM AS FUNCTION calculateStreamStatus(Stream_in IN number) RETURN varchar2 IS LANGUAGE JAVA NAME 'org.glast.pipeline.server.sql.sp.SPStream.calculateStreamStatus(int) return java.lang.String'; END SP_STREAM; /
a