Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This page holds a few example code-snippets for use in pyana analysis. The analysis is written in python and uses MatPlotLib.PyPlot for plotting of data. Compare with myana user examples to see how (some of) the same things can be done using the myana analysis framework.

Beamline data (Bld)

To read out energy, charge and position of the beam from the beamline data, use getEBeam(). It returns a class/structure that has the following members/fields:

The most reliable place for up-to-date information about all the event getters in pyana, see: https://confluence.slac.stanford.edu/display/PCDS/Pyana+Reference+Manual#PyanaReferenceManual-Classpyana.event.Event

For all the examples, you may assume that the pyana module contains a class with at least 'beginjob', 'event' and 'endjob' functions that starts something like this:

Code Block
none
none
titleoutline of a pyana module

import numpy as np
import matplotlib.pyplot as plt
from pypdsdata import xtc

class mypyana(object):
    def __init__(self,source="")
Code Block
nonenone

def event(self,evt,env):

    ebeam = evt.getEBeam()
    try :
        beamChrgself.source = ebeam.fEbeamChargesource
        beamEnrgself.counter = ebeam.fEbeamL3EnergyNone
        beamPosXself.array = ebeam.fEbeamLTUPosX
[]   # really just a list

 beamPosY   = ebeam.fEbeamLTUPosYdef beginjob(self,evt,env):
        beamAngXself.counter = ebeam.fEbeamLTUAngX 0

    def event(self,evt,env):
        beamAngYself.counter += ebeam.fEbeamLTUAngY1

        # beamPkCrsnippet code =goes ebeam.fEbeamPkCurrBC2here
        printthedata "ebeam: ", beamChrg, beamEnrg, beamPosX, beamPosY, beamAngX, beamAngY, beamPkCr= evt.get(xtc.TypeId.Type.Id_SomeType, self.source )
        self.array.append( thedata.somevalue )

    exceptdef endjob(self,evt,env):
        print "No EBeam object found"

To read out the energy from the front end enclosure (FEE) gas detector, use getFeeGasDet(). This returns and array of 4 numbers:

Code Block
nonenone
Job done! Processed %d events. " % self.counter

    fee_energy_array = evt.getFeeGasDet()
 # place for gdENRC11 = fee_energy_array[0]plotting etc

    gdENRC12 = fee_energy_array[1]
    gdENRC21 = fee_energy_array[2]
    gdENRC22 = fee_energy_array[3]

    energy = (gdENRC21 - gdENRC22) / 2
 # convert from python list to a numpy array
       self.array = np.array( self.array )

       # orplot usegraph
 the first two that has a different gain:
    energy = (gdENRC11 - gdENRC12) / 2
 plt.plot(self.array)

BeamLine Data: EBeam

To read out fit time and charge of the phase cavity, use getPhaseCavity() which returns a structure with the following fields:energy, charge and position of the beam from the beamline data, use getEBeam(). It returns a class/structure that has the following members/fields:

Code Block
none
none
titlegetEBeam

def event(self,evt,env):

    ebeam
Code Block

        pc = evt.getPhaseCavitygetEBeam()
    try :
   if  pc :
  beamChrg = ebeam.fEbeamCharge
        pcFitTime1beamEnrg = pcebeam.fFitTime1fEbeamL3Energy
        beamPosX = ebeam.fEbeamLTUPosX
  pcFitTime2   = pc.fFitTime2
  beamPosY = ebeam.fEbeamLTUPosY
        pcCharge1beamAngX = pcebeam.fCharge1fEbeamLTUAngX
            pcCharge2beamAngY = pcebeam.fCharge2fEbeamLTUAngY
        beamPkCr    = ebeam.fEbeamPkCurrBC2
        print "PhaseCavityebeam: ", pcFitTime1beamChrg, beamEnrg, pcFitTime2beamPosX, pcCharge1beamPosY, pcCharge2
beamAngX, beamAngY, beamPkCr
      else except:
            print "No PhaseEBeam Cavity object found"

Encoder data (delay scanner)

BeamLine Data: FEE Gas Detector

To read out the energy from the front end enclosure (FEE) gas detector, use getFeeGasDet(). This returns and array of 4 numbers:

Code Block
none
none
titlegetFeeGasDet
Code Block
nonenone

def event(self,evt,env):
    try:
        encoder fee_energy_array = evt.get(xtc.TypeId.Type.Id_EncoderData, self.enc_source getFeeGasDet()
    gdENRC11 =   encoder_value = encoder.value()
    except:fee_energy_array[0]
    gdENRC12 = fee_energy_array[1]
    gdENRC21 = fee_energy_array[2]
  print "No encodergdENRC22 found in this event"
        return

You could combine it with phase cavity time, and compute a time delay from it, for example (I don't know the origin of these parameters!):

Code Block
nonenone

     # Encoder Parameters to convert to picoseconds= fee_energy_array[3]

    energy = (gdENRC21 + gdENRC22) / 2.0
    # or use the first two that has a different gain:
    energy delay_a = -80.0e-6;= (gdENRC11 + gdENRC12) / 2.0

BeamLine Data: Phase Cavity

To read out fit time and charge of the phase cavity, use getPhaseCavity() which returns a structure with the following fields:

Code Block
none
none
titlegetPhaseCavity
     delay_bpc = 0.52168;evt.getPhaseCavity()
     delay_c = 299792458;
try:
         delay_0pcFitTime1 = 0;

pc.fFitTime1
     delay_time = (delay_a * encoder_valuepcFitTime2 + delay_b)*1.e-3 / delay_c) 
= pc.fFitTime2
         delay_timepcCharge1 = 2pc.fCharge1
 * delay_time / 1.0e-12 + delay_0 + pcFitTime1

Time data

The time of the event can be obtained within the event function:

Code Block
nonenone

def event ( self, evt, env ) :    pcCharge2 = pc.fCharge2
         print "PhaseCavity: ", pcFitTime1,  pcFitTime2, pcCharge1, pcCharge2
    event_time  = evt.getTime().seconds() + 1.0e-9*evt.getTime().nanoseconds() )

IPIMB diode data

Currently there are two data structures that holds data from the same type of devices. Depending on DAQ
configuration, they are either DetInfo type or BldInfo type. Here are examples for extracting both types
in the user module event function:

except :
         print "No Phase Cavity object found"

Event code

Code Block
none
none
titleEnvData
Code Block
nonenone
def event(self, evt, env):
    # raw data
    ipmRaw evrdata = evt.get(xtc.TypeId.Type.Id_IpimbData, source getEvrData("NoDetector-0|Evr-0")
    try:
    for i in  ch = [ipmRaw.channel0(),range (evrdata.numFifoEvents()):
        print "Event     ipmRaw.channel1(),
              ipmRaw.channel2(),
              ipmRaw.channel3() ]
                
        ch_volt = [ipmRaw.channel0Volts(),
                   ipmRaw.channel1Volts(),
                   ipmRaw.channel2Volts(),
    code: ", evrdata.fifoEvent(i).EventCode

In the example above, the address of the EvrData object is given as "NoDetector-0|Evr-0". The address may be different in other cases, so make sure you have the correct address. If you don't know what it is, you can use 'pyxtcreader -vv <xtcfile> | less' to browse your xtcfile and look for it. Look for a lines with 'contains=EvrConfig_V' or 'contains=EvrData_V'. The address will be found on the same line in 'src=DetInfo(<address>)'

Encoder data (delay scanner)

Code Block
none
none
titleEncoderData

def event(self,evt,env):
    try:
        encoder = evt.get(xtc.TypeId.Type.Id_EncoderData, self.enc_source )
        encoder_value = ipmRawencoder.channel3Voltsvalue() ]
    except:
        pass

print "No encoder found #in feature-extracted datathis event"
    ipmFex = evt.get(xtc.TypeId.Type.Id_IpmFex, source ) return

You could combine it with phase cavity time, and compute a time delay from it, for example (I don't know the origin of these parameters!):

Code Block
none
none
    try:
# Encoder Parameters to convert to picoseconds
   # arraydelay_a of 4 numbers= -80.0e-6;
    delay_b = 0.52168;
    fexdelay_channelc = ipmFex.channel 
299792458;
    delay_0     # scalar values= 0;

    delay_time = (delay_a *  fexencoder_sumvalue = ipmFex.sum 
  + delay_b)*1.e-3 / delay_c) 
       fex_xposdelay_time = ipmFex.xpos
2 * delay_time / 1.0e-12 +    fex_ypos = ipmFex.ypos

     except:
         pass

delay_0 + pcFitTime1

Time data

The time of the event can be obtained within the event function:

Code Block
none
none
titlegetTime
def event ( self, evt, env ) :
    ipmevent_time = evt.getTime().seconds() + 1.0e-9*evt.getTime().nanoseconds() )

IPIMB diode data

This is data from sets of 4 diodes around the beam line (Intensity Position, Intensity Monitoring Boards)
that measures the beam intensity in four spots, from which we can also deduce the position of the beam.

Currently there are two data structures that holds data from the same type of devices. Depending on DAQ
configuration, they are either DetInfo type or BldInfo type. Here are examples for extracting both types
in the user module event function:

Code Block
none
none
titleDetInfo

def event(self, evt, env):
    # raw data
    ipmRaw = evt.get(xtc.TypeId.Type.Id_IpimbData, source )
    try:getSharedIpimbValue("HFX-DG3-IMB-02")
    # or equivalently:
    # ipm = evt.get(xtc.TypeId.Type.Id_SharedIpimb, "HFX-DG3-IMB-02")
    try: 
        ### Raw data ###
        # arrays of 4 numbers:
        ch = [ipm.ipimbData.channel0(),
              ipm.ipimbData.channel1(),
              ipm.ipimbData.channel2(),
              ipm.ipimbData.channel3()]
        ch_volt = [ipmipmRaw.ipimbData.channel0Voltschannel0(),
                   ipm.ipimbData.channel1VoltsipmRaw.channel1(),
                   ipm.ipimbData.channel2VoltsipmRaw.channel2(),
                   ipm.ipimbData.channel3VoltsipmRaw.channel3() ]

        ### Feature-extracted data ###
     
   # array of 4 numbers:    ch_volt = [ipmRaw.channel0Volts(),
        fex_channels = ipm.ipmFexData.channel 
        ipmRaw.channel1Volts(),
        # scalars:
        fex_sum = ipm.ipmFexData.sum 
 ipmRaw.channel2Volts(),
       fex_xpos = ipm.ipmFexData.xpos
        fex_ypos = ipm.ipmFexData.ypos

 ipmRaw.channel3Volts()]
    except:
         pass

Acqiris waveform data

This method can be used for any detector/device that has Acqiris waveform data. Edit the self.address field to get the detector of your choice.

Initialize class members:

Code Block
    def __init__ ( self ):# feature-extracted data
    ipmFex    # initialize data= evt.get(xtc.TypeId.Type.Id_IpmFex, source )
    try:
        self.address =  "AmoITof-0|Acqiris-0"
 # array of 4 numbers
         self.datafex_channel = []ipmFex.channel 

        self.counter = 0

If you're curious to see any of the Acqiris configuration, e.g. how many channels do we have, you can inspect the AcqConfig object:

Code Block
 # scalar values
         fex_sum = ipmFex.sum 
    def beginjob ( self, evt, envfex_xpos )= :ipmFex.xpos
         cfgfex_ypos = env.getConfig( _pdsdata.xtc.TypeId.Type.Id_AcqConfig, self.address )ipmFex.ypos

     except:
        self.num = cfg.nbrChannels()

The read the event waveform data (an array) and append it to the self.data list:

 pass

Code Block
none
none
titleBldInfo

def event(
Code Block

    def event ( self, evt, env ) :
    ipm = evt.getSharedIpimbValue("HFX-DG3-IMB-02")
  channel = 0
# or equivalently:
    #  acqDataipm = evt.getAcqValue( self.address, channel, envget(xtc.TypeId.Type.Id_SharedIpimb, "HFX-DG3-IMB-02")
    try: 
   if acqData :
   ### Raw data ###
      self.counter+=1
  # arrays of 4 numbers:
        wfch = acqData.waveform[ipm.ipimbData.channel0(),
   #      returns a waveform array of numpyipm.ndarray type.ipimbData.channel1(),
            self  ipm.dataipimbData.appendchannel2(wf)

...

,

...

Code Block
    def endjob( self, env ) :
        ipm.ipimbData.channel3()]
        datach_volt = np.array(self.data)  # this is an array of shape (Nevents, nSamples)

[ipm.ipimbData.channel0Volts(),
                  # take the mean of all events for each sampling time
 ipm.ipimbData.channel1Volts(),
                 xs = npipm.meanipimbData.channel2Volts(data), axis=0)


                   pltipm.ipimbData.plotchannel3Volts(xs)]

        plt.xlabel('Seconds')### Feature-extracted data ###
        plt.ylabel('Volts') # array of 4 numbers:
        plt.show()

Which gives you a plot like this
Image Removed

Display images from princeton camera

When plotting with MatPlotLib, we don't need to set the limits of the histogram manually, thus we don't need to read the Princeton configuration for this. If we want to sum the image from several events, we must first define and initialize some variables:

Code Block

   def __init__ ( self ):
        # initialize data
        self.address =  "SxrEndstation-0|Princeton-0"
        self.data = None

In each event, we add the image array returned from the getPrincetonValue function:

fex_channels = ipm.ipmFexData.channel 
        
        # scalars:
        fex_sum = ipm.ipmFexData.sum 
        fex_xpos = ipm.ipmFexData.xpos
        fex_ypos = ipm.ipmFexData.ypos

     except:
         pass

Acqiris waveform data

This method can be used for any detector/device that has Acqiris waveform data. Edit the self.address field to get the detector of your choice.

Initialize class members:

Code Block
none
none

    def __init__ ( self ):
        # initialize data
        self.address =  "AmoITof-0|Acqiris-0"
        self.data = []
        self.counter = 0

If you're curious to see any of the Acqiris configuration, e.g. how many channels do we have, you can inspect the AcqConfig object:

Code Block
none
none

    def beginjob ( self, evt, env ) :
        cfg = env.getConfig( _pdsdata.xtc.TypeId.Type.Id_AcqConfig, self.address )
        self.num = cfg.nbrChannels()

The read the event waveform data (an array) and append it to the self.data list:

Code Block
none
none

    def event ( self, evt, env ) :
        channel = 0
        acqData = evt.getAcqValue( self.address, channel, env)
        if acqData :
            self.counter+=1
            wf = acqData.waveform()   # returns a waveform array of numpy.ndarray type.
            self.data.append(wf)

At the end of the job, take the average and plot it:

Code Block
none
none

    def endjob( self, env ) :

        data = np.array(self.data)  # this is an array of shape (Nevents, nSamples)

        # take the mean of all events for each sampling time
        xs = np.mean(data, axis=0)

        plt.plot(xs)

        plt.xlabel('Seconds')
        plt.ylabel('Volts')
        plt.show()

Which gives you a plot like this
Image Added

Princeton camera image

When plotting with MatPlotLib, we don't need to set the limits of the histogram manually, thus we don't need to read the Princeton configuration for this. If we want to sum the image from several events, we must first define and initialize some variables:

Code Block
none
none

   def __init__ ( self ):
        # initialize data
        self.address =  "SxrEndstation-0|Princeton-0"
        self.data = None

In each event, we add the image array returned from the getPrincetonValue function:

Code Block
none
none
titlegetPrincetonValue

  def event ( self, evt, env ) :

       frame = evt.getPrincetonValue( self.address, env)
       if frame :
           # accumulate the data
           if self.data is None :
               self.data = np.float_(frame.data())
           else :
               self.data += frame.data()

At the end of the job, display/save the array:

Code Block

   def endjob( self, env ) :
        plt.imshow( self.data/self.countpass, origin='lower')
        plt.colorbar()
        plt.show()

        # save the full image to a png file
        plt.imsave(fname="pyana_princ_image.png",arr=self.data, origin='lower')

Note that imsave saves the image only, pixel by pixel. If you want a view of the figure itself, lower resolution, you can save it from the interactive window you get from plt.show().
Image Added

PnCCD image

Code Block
none
none
titlegetPnCcdValue

def event(self,evt,env):
    try:
        frame = evt.getPnCcdValue( self.source, env )
        image = frame.data()
    except:
        pass

Other image (FCCD*,Opal,PIM (TM6740), ... )

These all use the generic getFrameValue function:

Code Block
none
none
titlegetFrameValue

def event(self,evt,env):
    try:
        frame = evt.getFrameValue( self.source )
        image = frame.data()
    except:
        pass

FCCD (Fast CCD) image

The Fast CCD is read out as two 8-bit images, therefore you need this extra line to convert it to the right format.

Code Block
none
none
titlegetFrameValue

def event(self,evt,env):
    try:
        frame = evt.getFrameValue( self.source )
        image = frame.data()
    except:
        pass

    # convert to 16-bit integer
    image.dtype = np.uint16

CsPad data

Here's an example of getting CsPad data from an event:

Code Block
none
none
titlegetCsPadQuads

def event(self,evt,env):
    quads = evt.getCsPadQuads(self.img_source, env)
    if not quads :
        print '*** cspad information is missing ***'
        return
        
    # dump information about quadrants
    print "Number of quadrants: %d" % len(quads)
    
    for q in quads:
        print "  Quadrant %d" % q.quad()
        print "    virtual_channel: %s" % q.virtual_channel()
        print "    lane: %s" % q.lane()
        print "    tid: %s" % q.tid()
        print "    acq_count: %s" % q.acq_count()
        print "    op_code: %s" % q.op_code()
        print "    seq_count: %s" % q.seq_count()
Code Block

  def event ( self, evt, env ) :

       frame = evt.getPrincetonValue( self.address, env)
       if frame :
           # accumulate the data
           if self.data is None :
        print "    ticks: %s" self.data = np.float_(frame.data())% q.ticks()
        print "   else fiducials: %s" % q.fiducials()
        print "    frame_type: %s" % selfq.data += frame.dataframe_type()

At the end of the job, display/save the array:

Code Block

   def endjob( self, env ) :
print "    sb_temp: %s" % plt.imshow( self.data/self.countpass, origin='lower'map(q.sb_temp, range(4))
        plt.colorbar()
    
    plt.show()

    # image data as # save the full image to a png file
        plt.imsave(fname="pyana_princ_image.png",arr=self.data, origin='lower')

Note that imsave saves the image only, pixel by pixel. If you want a view of the figure itself, lower resolution, you can save it from the interactive window you get from plt.show().
Image Removed

CsPad data

Here's an example of getting CsPad data from an event:

3-dimentional array
        data = q.data()

So far so good. 'quads' is a list of CsPad Element objects, and not necessarily ordered in the expected way. So you'll need to use q.quad() to obtain the quad number.
q.data() gives you a 3D numpy array [row][col][sec]. Here sections will be ordered as expected, but be aware in case of missing sections, that you may need to check the
configuration object. You can get that from the env object, typically something you do in beginjob:

Code Block
none
none

def beginjob(self,evt,env):
    config = env.getConfig(xtc.TypeId.Type.Id_CspadConfig, self.img_source)
    if not config:
        print '*** cspad config object is missing ***'
 
Code Block
nonenone

def event(self,evt,env):
    quads = evt.getCsPadQuads(self.img_source, env)
    if not quads :
        print '*** cspad information is missing ***'
        return
        
    # dump information about quadrants
    print "Number of quadrants: %d" % len(quads)
    
    for q in quads:
       return print  "  Quadrant %d" % q.quad()
    print "Cspad configuration"
    print "  N quadrants   virtual_channel: %s%d" % qconfig.virtual_channelnumQuads()
    print "  Quad printmask "    lane: %s%#x" % qconfig.lanequadMask()
        print "  payloadSize  tid : %s%d" % qconfig.tidpayloadSize()
        print "  badAsicMask0  acq_count: %s%#x" % qconfig.acq_countbadAsicMask0()
        print "  badAsicMask1  op_code: %s%#x" % qconfig.op_codebadAsicMask1()
    print "  asicMask print "    seq_count: %s%#x" % qconfig.seq_countasicMask()
        print "  numAsicsRead  ticks: %s%d" % qconfig.ticksnumAsicsRead()

   # get the indices of printsections "in use:
   fiducials:qn %s" % q.fiducials()
= range(0,config.numQuads())        print "    frame_type: %s" % q.frame_type()
   self.sections = map(config.sections, qn ) print "    sb_temp: %s" % map(q.sb_temp, range(4))
            
        # image data as 3-dimentional array
        data = q.data()

data2 will give you the third section stored, but be aware that sections sometimes are missing,
and in this case you'll need to check with the configuration information that you can obtain in beginjob:

 

If you want to draw the whole CsPad image, there's currently no pyana function that does this. Pyana only supplies the pixels in a numpy array, and the
exact location of each pixel depends on the conditions at the time of data collection. A simplified way of making the image can be seen in cspad_simple.py(newer version (cspad.py) available if you check out the XtcExplorer package).

CSPad pixel coordinates.

The CSPad detector image can be drawn by positioning the sections from the data array into a large image array. This is done in cspad_simple.py above. The positions are extracted from optical meterology measurements and additional calibrations. Alternatively one can find the coordinate of each individual pixel from a pixel map, based on the same optical metrology measurements. This is described in details here

Epics Process Variables and ControlConfig

EPICS data is different from DAQ event data. It stores the conditions and settings of the instruments, but values typically change more slowly than your
average shot-by-shot data, and EPICS data is typically updated only when it changes, or every second, or similar. It is not stored in the 'evt' (event) object,
but in the 'env' (environment) object. You typically would read it only at the beginning of each job or if your doing a scan, you'd read it in every calibration cycle:

Code Block
none
none
titleenv.epicsStore()

def begincalibcycle(self,evt,env):

    ## The returned value should be of the type epics.EpicsPvTime.
    pv = env.epicsStore().value( pv_name )
    if not pv:
        logging.warning('EPICS PV %s does not exist', pv_name)
    else:
        value = pv.value 
        status = pv.status 
Code Block
nonenone

def beginjob(self,evt,env):
    config = env.getConfig(xtc.TypeId.Type.Id_CspadConfig, self.img_source)
    if not config:
        print '*** cspad config object is missing ***'
        return
        
    quads = range(4)

    print 
    print "Cspad configuration"
    print "  N quadrants   : %d" % config.numQuads()
    print "  Quad mask     : %#x" % config.quadMask()
    print "  payloadSize   : %d" % config.payloadSize()
    print "  badAsicMask0  : %#x" % config.badAsicMask0()
    print "  badAsicMask1  : %#x" % config.badAsicMask1()
    print "  asicMask      : %#x" % config.asicMask()
    print "  numAsicsRead  : %d" % config.numAsicsRead()
    try:
        # older versions may not have all methods
        printalarm_severity "=  roiMask   pv.severity 
Code Block
none
none
titleControlConfig

def begincalibcycle(self,evt,env):
    : [%s]" % ', '.join([hex(config.roiMask(q)) for q in quads])ctrl_config = env.getConfig(xtc.TypeId.Type.Id_ControlConfig)
    
    nControls    print "  numAsicsStored: %s" % str(map(config.numAsicsStored, quads))= ctrl_config.npvControls()
    except:
for ic in range (0, nControls ):

   pass
    print "cpv  sections= ctrl_config.pvControl(ic)
       : %s"name % str(map(config.sections, quads))
    print
= cpv.name()
        value = cpv.value()