Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This is an attempt to define a new XML format for describing tasks for Pipeline II. I started from the XML since this is how the pipeline user will define tasks, so this is what the user will typically see (as opposed to the database schema which is normally only seen by developers). Although it only describes the XML format it clearly has many consequences on how the database would need to be modified, and how the pipeline submission code would need to be implemented. It tries to address many of the concerns listed under requirements.

Code Block
xml
xml

<?xml version="1.0" encoding="UTF-8"?>
<pipeline
      xmlns="http://glast-ground.slac.stanford.edu/pipeline"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://glast-ground.slac.stanford.edu/pipeline http://glast-ground.slac.stanford.edu/Pipeline/schemas/2.0/pipeline.xsd">

   <variables>
      <var name="MAX_STREAMS">75</var>
      <var name="MIN_EVENTS_PER_STREAM">1000</var>
   </variables>   
      
   <task>
      <name>Data Processing</name>
      <type>Data</type>
      <notation>This is the main data processing task</notation>

      <!-- Things that must be specified when a stream is created for this task -->
      <prerequisites>
          <prerequisite name="infile"/>
          <prerequisite name="run" type="int"/>
      </prerequisites>
      
      <step name="partition" type="script">
          <notation>Partition the data into n parallel streams</notation>
          <script language="python">
             nEvents = fileService.getNEvents(infile) 
             nStreams = min(nEvents/MIN_EVENTS_PER_STREAM,MAX_STREAMS)
             nEventsPerStream = nEvents/nStreams
             firstEvent = 0
             for (i=0; i<nStreams; i++) 
             {
                Map vars = new HashMap();
                vars.add("firstEvent",firstEvent)
                vars.add("maxEvents",nEventsPerStream)
                pipeline.createSubStream(i,"DataStream",vars)
                firstEvent += nEventsPerStream
             }
          </script>
      </step>

      <step name="createSummaryHistograms" type="batch">
          <depends>
             <after step="DataStream.Stage1"/>
          </depends>
      </step>      
      
      <step name="concatenateOutput" type="batch">
          <depends>
             <after step="DataStream.Stage2"/>
          </depends>
      </step> 
   
      <!-- Note, nested tasks can only be created as subtasks of the parent task -->
      <task>
        <name>DataStream</name>
        <notation>A subtask that is run in parallel to process some fraction of the data</notation>
        <prerequisites>
          <prerequisite name="firstEvent" type="int"/>
          <prerequisite name="maxEvents" type="int"/>
        </prerequisites>
        
        <step name="Stage1" type="batch">
           <notation>Twiggle the Twoggles</notation>
           <job executable="${GLEAM_ROOT}/bin/Stage1.pl" maxCPU="1800">
              <working-directory> </working-directory>
              <log-file> </log-file>    
              <input-file name="${infile}" envvar="INFILE"/>
              <output-file name="stage1-output-${format(run,%6d)}-${format{stream,%6d)}" envvar="OUTFILE"/>
              <environment>
                <var name="FIRST_EVENT">${firstEvent}</var> 
                <var name="MAX_EVENTS">${maxEvents}</var>
              </environment>
           </job>
        </step>
        
        <step name="Stage2" type="batch">
          <notation>Twoggle the Twiggles</notation>
          <job executable="${GLEAM_ROOT}/bin/Stage2.pl" maxCPU="3600">
             <working-directory> </working-directory>
             <log-file> </log-file>             
          </job>
          <depends>
             <after step="Stage1"/>
          </depends>
        </step>
      </task>
    </task>
</pipeline>