Possible XML for pipeline II

NOTE Work in progress, comments/suggestions etc welcome.

This is an attempt to define a new XML format for describing tasks for Pipeline II. I started from the XML since this is how the pipeline user will define tasks, so this is what the user will typically see (as opposed to the database schema which is normally only seen by developers). Although it only describes the XML format it clearly has many consequences on how the database would need to be modified, and how the pipeline submission code would need to be implemented. It tries to address many of the concerns listed under requirements.

Notes

  1. One XML file can define one or more tasks.
  2. Tasks may be nested, to create subtasks
  3. Two types of steps are envisioned, scripts which run in-line in the pipeline, and jobs which run as (LSF) batch jobs
  4. I assume that we will probably write the scripts in Python. I am not good at python so the example contains "pseudo" code instead.
  5. Variables can be defined at the pipeline, task, subtask, or step level. They can also be passed in at runtime via createStream() (replacement for createRun).
  6. Prerequisites may be specified for tasks which list variables that must be passed in to createRun (and optionally their type).
  7. Variables are inherited from the parent context, for example a subtask inherits all its parent task's variables, and a step inherits all its parent tasks variables.
  8. Variables can be accessed from (python) scripts, and can also be used as the value of XML attributes (JSP style)
  9. Variables are passed as environment variables to batch jobs, prefixed with "PIPELINE_"
  10. There are two types of tasks:
    1. sequential (like the current pipeline) processes its steps in order
    2. parallel processes steps in any order, including multiple steps at the same time. Explicit <depends> clauses are required on the steps to control which order things are executed
  11. All access to the file system is via a fileService, since we don't want to assume that the server has direct access to the file-system. File service should allow things such as:
    1. Check existence of files
    2. Get size/modification date of files
    3. Allow files to be deleted?
    4. Find # events in a file?
  12. Tasks should be able to request space for outputting data, logs etc from a diskService.

Example XML

<?xml version="1.0" encoding="UTF-8"?>
<pipeline
      xmlns="http://glast-ground.slac.stanford.edu/pipeline"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://glast-ground.slac.stanford.edu/pipeline http://glast-ground.slac.stanford.edu/Pipeline/schemas/2.0/pipeline.xsd">

   <variables>
      <var name="MAX_STREAMS">75</var>
      <var name="MIN_EVENTS_PER_STREAM">1000</var>
   </variables>   
      
   <task>
      <name>Data Processing</name>
      <type>Data</type>
      <notation>This is the main data processing task</notation>

      <!-- Things that must be specified when a stream is created for this task -->
      <prerequisites>
          <prerequisite name="infile"/>
          <prerequisite name="run" type="int"/>
      </prerequisites>
      
      <step name="partition" type="script">
          <notation>Partition the data into n parallel streams</notation>
          <script language="python">
             nEvents = fileService.getNEvents(infile) 
             nStreams = min(nEvents/MIN_EVENTS_PER_STREAM,MAX_STREAMS)
             nEventsPerStream = nEvents/nStreams
             firstEvent = 0
             for (i=0; i<nStreams; i++) 
             {
                Map vars = new HashMap();
                vars.add("firstEvent",firstEvent)
                vars.add("maxEvents",nEventsPerStream)
                pipeline.createSubStream(i,"DataStream",vars)
                firstEvent += nEventsPerStream
             }
          </script>
      </step>

      <step name="createSummaryHistograms" type="batch">
          <depends>
             <after step="DataStream.Stage1"/>
          </depends>
      </step>      
      
      <step name="concatenateOutput" type="batch">
          <depends>
             <after step="DataStream.Stage2"/>
          </depends>
      </step> 
   
      <!-- Note, nested tasks can only be created as subtasks of the parent task -->
      <task>
        <name>DataStream</name>
        <notation>A subtask that is run in parallel to process some fraction of the data</notation>
        <prerequisites>
          <prerequisite name="firstEvent" type="int"/>
          <prerequisite name="maxEvents" type="int"/>
        </prerequisites>
        
        <step name="Stage1" type="batch">
           <notation>Twiggle the Twoggles</notation>
           <job executable="${GLEAM_ROOT}/bin/Stage1.pl" maxCPU="1800">
              <working-directory> </working-directory>
              <log-file> </log-file>    
              <input-file name="${infile}" envvar="INFILE"/>
              <output-file name="stage1-output-${format(run,%6d)}-${format{stream,%6d)}" envvar="OUTFILE"/>
              <environment>
                <var name="FIRST_EVENT">${firstEvent}</var> 
                <var name="MAX_EVENTS">${maxEvents}</var>
              </environment>
           </job>
        </step>
        
        <step name="Stage2" type="batch">
          <notation>Twoggle the Twiggles</notation>
          <job executable="${GLEAM_ROOT}/bin/Stage2.pl" maxCPU="3600">
             <working-directory> </working-directory>
             <log-file> </log-file>             
          </job>
          <depends>
             <after step="Stage1"/>
          </depends>
        </step>
      </task>
    </task>
</pipeline>
  • No labels