You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 12 Next »

This page holds a few example code-snippets for use in pyana analysis. The analysis is written in python and uses MatPlotLib.PyPlot for plotting of data. Compare with myana user examples to see how (some of) the same things can be done using the myana analysis framework.

Encoder data (delay scanner)

def event(self,evt,env):
    try:
        encoder = evt.get(xtc.TypeId.Type.Id_EncoderData, self.enc_source )
        encoder_value = encoder.value()
    except:
        print "No encoder found in this event"
        return

You could combine it with phase cavity time, and compute a time delay from it, for example (I don't know the origin of these parameters!):

     # Encoder Parameters to convert to picoseconds
     delay_a = -80.0e-6;
     delay_b = 0.52168;
     delay_c = 299792458;
     delay_0 = 0;

    delay_time = (delay_a * encoder_value + delay_b)*1.e-3 / delay_c) 
    delay_time = 2 * delay_time / 1.0e-12 + delay_0 + phase_cavity1

Time data

The time of the event can be obtained within the event function:

def event ( self, evt, env ) :
    event_time = evt.getTime().seconds() + 1.0e-9*evt.getTime().nanoseconds() )

IPIMB diode data

Currently there are two data structures that holds data from the same type of devices. Depending on DAQ
configuration, they are either DetInfo type or BldInfo type. Here are examples for extracting both types
in the user module event function:

def event(self, evt, env):
    # raw data
    ipmRaw = evt.get(xtc.TypeId.Type.Id_IpimbData, source )
    try:
        ch = [ipmRaw.channel0(),
              ipmRaw.channel1(),
              ipmRaw.channel2(),
              ipmRaw.channel3() ]
                
        ch_volt = [ipmRaw.channel0Volts(),
                   ipmRaw.channel1Volts(),
                   ipmRaw.channel2Volts(),
                   ipmRaw.channel3Volts() ]
    except:
        pass

    # feature-extracted data
    ipmFex = evt.get(xtc.TypeId.Type.Id_IpmFex, source )
    try:
         # array of 4 numbers
         fex_channel = ipmFex.channel 

         # scalar values
         fex_sum = ipmFex.sum 
         fex_xpos = ipmFex.xpos
         fex_ypos = ipmFex.ypos

     except:
         pass

def event(self, evt, env):
    ipm = evt.getSharedIpimbValue("HFX-DG3-IMB-02")
    # or equivalently:
    # ipm = evt.get(xtc.TypeId.Type.Id_SharedIpimb, "HFX-DG3-IMB-02")
    try: 
        ### Raw data ###
        # arrays of 4 numbers:
        ch = [ipm.ipimbData.channel0(),
              ipm.ipimbData.channel1(),
              ipm.ipimbData.channel2(),
              ipm.ipimbData.channel3()]
        ch_volt = [ipm.ipimbData.channel0Volts(),
                   ipm.ipimbData.channel1Volts(),
                   ipm.ipimbData.channel2Volts(),
                   ipm.ipimbData.channel3Volts()]

        ### Feature-extracted data ###
        # array of 4 numbers:
        fex_channels = ipm.ipmFexData.channel 
        
        # scalars:
        fex_sum = ipm.ipmFexData.sum 
        fex_xpos = ipm.ipmFexData.xpos
        fex_ypos = ipm.ipmFexData.ypos

     except:
         pass

Acqiris waveform data

This method can be used for any detector/device that has Acqiris waveform data. Edit the self.address field to get the detector of your choice.

Initialize class members:

    def __init__ ( self ):
        # initialize data
        self.address =  "AmoITof-0|Acqiris-0"
        self.data = []
        self.counter = 0

If you're curious to see any of the Acqiris configuration, e.g. how many channels do we have, you can inspect the AcqConfig object:

    def beginjob ( self, evt, env ) :
        cfg = env.getConfig( _pdsdata.xtc.TypeId.Type.Id_AcqConfig, self.address )
        self.num = cfg.nbrChannels()

The read the event waveform data (an array) and append it to the self.data list:

    def event ( self, evt, env ) :
        channel = 0
        acqData = evt.getAcqValue( self.address, channel, env)
        if acqData :
            self.counter+=1
            wf = acqData.waveform()   # returns a waveform array of numpy.ndarray type.
            self.data.append(wf)

At the end of the job, take the average and plot it:

    def endjob( self, env ) :

        data = np.array(self.data)  # this is an array of shape (Nevents, nSamples)

        # take the mean of all events for each sampling time
        xs = np.mean(data, axis=0)

        plt.plot(xs)

        plt.xlabel('Seconds')
        plt.ylabel('Volts')
        plt.show()

Which gives you a plot like this

Beamline data (Bld)

To read out energy, charge and position of the beam from the beamline data, use getEBeam(). It returns a class/structure that has the following members/fields:

       ebeam = evt.getEBeam()
        if ebeam :
            beamChrg = ebeam.fEbeamCharge
            beamEnrg = ebeam.fEbeamL3Energy
            beamPosX = ebeam.fEbeamLTUPosX
            beamPosY = ebeam.fEbeamLTUPosY
            beamAngX = ebeam.fEbeamLTUAngX
            beamAngY = ebeam.fEbeamLTUAngY
            beamPkCr = ebeam.fEbeamPkCurrBC2
            print "ebeam: ", beamChrg, beamEnrg, beamPosX, beamPosY, beamAngX, beamAngY, beamPkCr
        else :
            print "No EBeam object found"

To read out the energy from the front end enclosure (FEE) gas detector, use getFeeGasDet(). This returns and array of 4 numbers:

       fee_energy_array = evt.getFeeGasDet()
        gdENRC11 = fee_energy_array[0]
        gdENRC12 = fee_energy_array[1]
        gdENRC21 = fee_energy_array[2]
        gdENRC22 = fee_energy_array[3]
        print "GasDet energy ", gdENRC11, gdENRC12, gdENRC21, gdENRC22

To read out fit time and charge of the phase cavity, use getPhaseCavity() which returns a structure with the following fields:

        pc = evt.getPhaseCavity()
        if pc :
            pcFitTime1 = pc.fFitTime1
            pcFitTime2 = pc.fFitTime2
            pcCharge1 = pc.fCharge1
            pcCharge2 = pc.fCharge2
            print "PhaseCavity: ", pcFitTime1,  pcFitTime2, pcCharge1, pcCharge2
        else :
            print "No Phase Cavity object found"

Display images from princeton camera

When plotting with MatPlotLib, we don't need to set the limits of the histogram manually, thus we don't need to read the Princeton configuration for this. If we want to sum the image from several events, we must first define and initialize some variables:

   def __init__ ( self ):
        # initialize data
        self.address =  "SxrEndstation-0|Princeton-0"
        self.data = None

In each event, we add the image array returned from the getPrincetonValue function:

  def event ( self, evt, env ) :

       frame = evt.getPrincetonValue( self.address, env)
       if frame :
           # accumulate the data
           if self.data is None :
               self.data = np.float_(frame.data())
           else :
               self.data += frame.data()

At the end of the job, display/save the array:

   def endjob( self, env ) :
        plt.imshow( self.data/self.countpass, origin='lower')
        plt.colorbar()
        plt.show()

        # save the full image to a png file
        plt.imsave(fname="pyana_princ_image.png",arr=self.data, origin='lower')

Note that imsave saves the image only, pixel by pixel. If you want a view of the figure itself, lower resolution, you can save it from the interactive window you get from plt.show().

CsPad data

Here's an example of getting CsPad data from an event:

def event(self,evt,env):
    quads = evt.getCsPadQuads(self.img_source, env)
    if not quads :
        print '*** cspad information is missing ***'
        return
        
    # dump information about quadrants
    print "Number of quadrants: %d" % len(quads)
    
    for q in quads:
        print "  Quadrant %d" % q.quad()
        print "    virtual_channel: %s" % q.virtual_channel()
        print "    lane: %s" % q.lane()
        print "    tid: %s" % q.tid()
        print "    acq_count: %s" % q.acq_count()
        print "    op_code: %s" % q.op_code()
        print "    seq_count: %s" % q.seq_count()
        print "    ticks: %s" % q.ticks()
        print "    fiducials: %s" % q.fiducials()
        print "    frame_type: %s" % q.frame_type()
        print "    sb_temp: %s" % map(q.sb_temp, range(4))
            
        # image data as 3-dimentional array
        data = q.data()

data2 will give you the third section stored, but be aware that sections sometimes are missing,
and in this case you'll need to check with the configuration information that you can obtain in beginjob:

def beginjob(self,evt,env):
    config = env.getConfig(xtc.TypeId.Type.Id_CspadConfig, self.img_source)
    if not config:
        print '*** cspad config object is missing ***'
        return
        
    quads = range(4)

    print 
    print "Cspad configuration"
    print "  N quadrants   : %d" % config.numQuads()
    print "  Quad mask     : %#x" % config.quadMask()
    print "  payloadSize   : %d" % config.payloadSize()
    print "  badAsicMask0  : %#x" % config.badAsicMask0()
    print "  badAsicMask1  : %#x" % config.badAsicMask1()
    print "  asicMask      : %#x" % config.asicMask()
    print "  numAsicsRead  : %d" % config.numAsicsRead()
    try:
        # older versions may not have all methods
        print "  roiMask       : [%s]" % ', '.join([hex(config.roiMask(q)) for q in quads])
        print "  numAsicsStored: %s" % str(map(config.numAsicsStored, quads))
    except:
        pass
    print "  sections      : %s" % str(map(config.sections, quads))
    print
        
  • No labels