Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This page holds a few example code-snippets for use in pyana analysis. The analysis is written in python and uses MatPlotLib.PyPlot for plotting of data. Compare with myana user examples to see how (some of) the same things can be done using the myana analysis framework. The most reliable place for up-to-date information about all the event getters in pyana, see: https://confluence.slac.stanford.edu/display/PCDS/Pyana+Reference+Manual#PyanaReferenceManual-Classpyana.event.EventImage Removed

For all the examples, you may assume that the pyana module contains a class with at least 'beginjob', 'event' and 'endjob' functions that starts something like this:

Code Block
none
none
titleoutline of a pyana module
import numpy as np
import matplotlib.pyplot as plt
from pypdsdata import xtc

class mypyana(object):
    def __init__(self,source=""):
        self.source = source
        self.counter = None
        self.array = []   # really just a list

    def beginjob(self,evt,env):
        self.counter = 0

    def event(self,evt,env):
        self.counter += 1

        # snippet code goes here
        thedata = evt.get(xtc.TypeId.Type.Id_SomeType, self.source )
        self.array.append( thedata.somevalue )

    def endjob(self,evt,env):
       print "Job done! Processed %d events. " % self.counter

       # place for plotting etc

BeamLine Data: EBeam

To read out energy, charge and position of the beam from the beamline data, use getEBeam(). It returns a class/structure that has the following members/fields:

Code Block
nonenone
titlegetEBeam

def event(self,evt,env):

    ebeam = evt.getEBeam()
       # convert from python list to a numpy array
    try :
  self.array =     beamChrg = ebeam.fEbeamCharge
        beamEnrg = ebeam.fEbeamL3Energynp.array( self.array )

       # beamPosX = ebeam.fEbeamLTUPosXplot graph
       plt.plot(self.array)

BeamLine Data: EBeam

To read out energy, charge and position of the beam from the beamline data, use getEBeam(). It returns a class/structure that has the following members/fields:

Code Block
none
none
titlegetEBeam

def event(self,evt,env):

    ebeam = evt.getEBeam() beamPosY = ebeam.fEbeamLTUPosY
        beamAngX = ebeam.fEbeamLTUAngX
        beamAngY = ebeam.fEbeamLTUAngY
    try :
   beamPkCr     beamChrg = ebeam.fEbeamPkCurrBC2fEbeamCharge
        printbeamEnrg = "ebeam: ", beamChrg, beamEnrg, beamPosX, beamPosY, beamAngX, beamAngY, beamPkCr
ebeam.fEbeamL3Energy
        beamPosX = ebeam.fEbeamLTUPosX
        beamPosY = ebeam.fEbeamLTUPosY
     except:
   beamAngX = ebeam.fEbeamLTUAngX
   print "No EBeam object found"

BeamLine Data: FEE Gas Detector

To read out the energy from the front end enclosure (FEE) gas detector, use getFeeGasDet(). This returns and array of 4 numbers:

Code Block
nonenone
titlegetFeeGasDet
  beamAngY = ebeam.fEbeamLTUAngY
    fee_energy_array    beamPkCr = evtebeam.getFeeGasDet()fEbeamPkCurrBC2
    gdENRC11   = fee_energy_array[0]
    gdENRC12 = fee_energy_array[1]
    gdENRC21 = fee_energy_array[2] print "ebeam: ", beamChrg, beamEnrg, beamPosX, beamPosY, beamAngX, beamAngY, beamPkCr
    except:
    gdENRC22 = fee_energy_array[3]

  print "No energyEBeam object found"

BeamLine Data: FEE Gas Detector

To read out the energy from the front end enclosure (FEE) gas detector, use getFeeGasDet(). This returns and array of 4 numbers:

Code Block
none
none
titlegetFeeGasDet
= (gdENRC21 - gdENRC22) / 2
    # or use the first two that has a different gain:fee_energy_array = evt.getFeeGasDet()
    gdENRC11 = fee_energy_array[0]
    gdENRC12 = fee_energy_array[1]
    gdENRC21 = fee_energy_array[2]
    gdENRC22 = fee_energy_array[3]

    energy = (gdENRC11gdENRC21 -+ gdENRC12gdENRC22) / 2
.0
    # or use the first two that has a different gain:
    energy = (gdENRC11 + gdENRC12) / 2.0

BeamLine BeamLine Data: Phase Cavity

To read out fit time and charge of the phase cavity, use getPhaseCavity() which returns a structure with the following fields:

Code Block
none
none
titlegetPhaseCavity
     pc = evt.getPhaseCavity()
     try:
         pcFitTime1 = pc.fFitTime1
         pcFitTime2 = pc.fFitTime2
         pcCharge1 = pc.fCharge1
         pcCharge2 = pc.fCharge2
         print "PhaseCavity: ", pcFitTime1,  pcFitTime2, pcCharge1, pcCharge2
      except :
         print "No Phase Cavity object found"

...

Event code

Code Block
none
none
titleEncoderDataEnvData
def event(self, evt, env):
    try:
        encoder evrdata = evt.get(xtc.TypeId.Type.Id_EncoderData, self.enc_source )
  getEvrData("NoDetector-0|Evr-0")
    
    for i encoder_valuein =range encoder(evrdata.valuenumFifoEvents())
    except:
        print "No encoder found in this event"
        return

...

Event code: ", evrdata.fifoEvent(i).EventCode

In the example above, the address of the EvrData object is given as "NoDetector-0|Evr-0". The address may be different in other cases, so make sure you have the correct address. If you don't know what it is, you can use 'pyxtcreader -vv <xtcfile> | less' to browse your xtcfile and look for it. Look for a lines with 'contains=EvrConfig_V' or 'contains=EvrData_V'. The address will be found on the same line in 'src=DetInfo(<address>)'

Encoder data (delay scanner)

Code Block
none
none
titleEncoderData
    # Encoder Parameters to convert to picoseconds
def event(self,evt,env):
    try:
        delay_aencoder = -80.0e-6;
    delay_b = 0.52168;
evt.get(xtc.TypeId.Type.Id_EncoderData, self.enc_source )
        delayencoder_cvalue = 299792458;encoder.value()
    delay_0 = 0;

except:
      delay_time = (delay_a *print "No encoder_value + delay_b)*1.e-3 / delay_c) found in this event"
    delay_time = 2 * delay_time / 1.0e-12 + delay_0 + pcFitTime1

Time data

 return

You could combine it with phase cavity time, and compute a time delay from it, for example (I don't know the origin of these parameters!)The time of the event can be obtained within the event function:

Code Block
none
nonetitlegetTime
def    event# (Encoder self,Parameters evt,to envconvert )to :picoseconds
    eventdelay_timea = evt.getTime().seconds() + 1.0e-9*evt.getTime().nanoseconds() )

IPIMB diode data

This is data from sets of 4 diodes around the beam line (Intensity Position, Intensity Monitoring Boards)
that measures the beam intensity in four spots, from which we can also deduce the position of the beam.

-80.0e-6;
    delay_b = 0.52168;
    delay_c = 299792458;
    delay_0 = 0;

    delay_time = (delay_a * encoder_value + delay_b)*1.e-3 / delay_c) 
    delay_time = 2 * delay_time / 1.0e-12 + delay_0 + pcFitTime1

Time data

The time of the event can be obtained within the event Currently there are two data structures that holds data from the same type of devices. Depending on DAQ
configuration, they are either DetInfo type or BldInfo type. Here are examples for extracting both types
in the user module event function:

Code Block
none
none
titleDetInfogetTime
def event ( self, evt, env ) :
    # raw data
    ipmRaw event_time = evt.getgetTime().seconds() + 1.0e-9*evt.getTime().nanoseconds() )

IPIMB diode data

This is data from sets of 4 diodes around the beam line (Intensity Position, Intensity Monitoring Boards)
that measures the beam intensity in four spots, from which we can also deduce the position of the beam.

Currently there are two data structures that holds data from the same type of devices. Depending on DAQ
configuration, they are either DetInfo type or BldInfo type. Here are examples for extracting both types
in the user module event function:

Code Block
none
none
titleDetInfo

def event(self, evt, env):
    # raw data
    ipmRaw = evt.get(xtc.TypeId.Type.Id_IpimbData, source )
    try:xtc.TypeId.Type.Id_IpimbData, source )
    try:
        ch = [ipmRaw.channel0(),
              ipmRaw.channel1(),
              ipmRaw.channel2(),
              ipmRaw.channel3() ]
                
        ch_volt = [ipmRaw.channel0Voltschannel0(),
                   ipmRaw.channel1Voltschannel1(),
                   ipmRaw.channel2Voltschannel2(),
                   ipmRaw.channel3Voltschannel3() ]
    except:
        pass

    #
 feature-extracted data
    ipmFex  ch_volt = evt[ipmRaw.get(xtc.TypeId.Type.Id_IpmFex, source )
channel0Volts(),
      try:
         # array of 4 numbers ipmRaw.channel1Volts(),
         fex_channel = ipmFex.channel 

       ipmRaw.channel2Volts(),
  # scalar values
         fex_sum = ipmFex.sum 
    ipmRaw.channel3Volts()]
     fex_xpos = ipmFex.xpos
except:
        pass

    # feature-extracted data
    fex_yposipmFex = ipmFex.ypos

evt.get(xtc.TypeId.Type.Id_IpmFex, source )
     excepttry:
         pass

Code Block
nonenone
titleBldInfo

def event(self, evt, env):# array of 4 numbers
    ipm = evt.getSharedIpimbValue("HFX-DG3-IMB-02")
   fex_channel #= or equivalently:ipmFex.channel 

    # ipm = evt.get(xtc.TypeId.Type.Id_SharedIpimb, "HFX-DG3-IMB-02")
    try: 
 # scalar values
         fex_sum ### Raw data ###= ipmFex.sum 
        # arraysfex_xpos of 4 numbers:
= ipmFex.xpos
         chfex_ypos = [ipm.ipimbData.channel0(),ipmFex.ypos

     except:
         pass

Code Block
none
none
titleBldInfo

def event(self, evt, env):
ipm.ipimbData.channel1(),
      ipm = evt.getSharedIpimbValue("HFX-DG3-IMB-02")
    # or equivalently:
    # ipm.ipimbData.channel2(), = evt.get(xtc.TypeId.Type.Id_SharedIpimb, "HFX-DG3-IMB-02")
    try: 
         ipm.ipimbData.channel3()]
  ### Raw data ###
      ch_volt =  # arrays of 4 numbers:
        ch = [ipm.ipimbData.channel0Voltschannel0(),
                   ipm.ipimbData.channel1Voltschannel1(),
                   ipm.ipimbData.channel2Voltschannel2(),
                   ipm.ipimbData.channel3Voltschannel3()]

        ### Feature-extracted data ###ch_volt = [ipm.ipimbData.channel0Volts(),
        # array of 4 numbers:
        fex_channels = ipm.ipmFexData.channel ipm.ipimbData.channel1Volts(),
        
        # scalars:
  ipm.ipimbData.channel2Volts(),
      fex_sum = ipm.ipmFexData.sum 
        fex_xpos = ipm.ipmFexData.xposipimbData.channel3Volts()]

        fex_ypos = ipm.ipmFexData.ypos

### Feature-extracted data ###
     except:
   # array of 4 numbers:
   pass

Acqiris waveform data

This method can be used for any detector/device that has Acqiris waveform data. Edit the self.address field to get the detector of your choice.

Initialize class members:

Code Block
nonenone

    def fex__init__ ( self ):channels = ipm.ipmFexData.channel 
        #
 initialize data       # scalars:
        self.addressfex_sum =  "AmoITof-0|Acqiris-0"ipm.ipmFexData.sum 
        self.datafex_xpos = []ipm.ipmFexData.xpos
        self.counterfex_ypos = 0
ipm.ipmFexData.ypos

     except:
         pass

Acqiris waveform data

This method can be used for any detector/device that has Acqiris waveform data. Edit the self.address field to get the detector of your choice.

Initialize class membersIf you're curious to see any of the Acqiris configuration, e.g. how many channels do we have, you can inspect the AcqConfig object:

Code Block
none
none
    def beginjob__init__ ( self, evt, env ) : ):
        # initialize data
        cfgself.address = env.getConfig( _pdsdata.xtc.TypeId.Type.Id_AcqConfig, self.address )  "AmoITof-0|Acqiris-0"
        self.data = []
        self.numcounter = cfg.nbrChannels()
0

If you're curious to see any of the Acqiris configuration, e.g. how many channels do we have, you can inspect the AcqConfig objectThe read the event waveform data (an array) and append it to the self.data list:

Code Block
none
none
    def eventbeginjob ( self, evt, env ) :
        channelcfg = 0 env.getConfig( _pdsdata.xtc.TypeId.Type.Id_AcqConfig, self.address )
        acqDataself.num = evtcfg.nbrChannels()

The read the event waveform data (an array) and append it to the self.data list:

Code Block
none
none

    def event ( self, evt, env ) :getAcqValue( self.address, channel, env)
        ifchannel acqData= :0
        acqData = evt.getAcqValue(  self.counter+=1
  address, channel, env)
        if acqData :
            self.counter+=1
            wf = acqData.waveform()   # returns a waveform array of numpy.ndarray type.
            self.data.append(wf)

...

Code Block
none
none
titlegetPnCcdValue
def event(self,evt,env):
    try:
        frame = evt.getPnCcdValue( self.source, env )
        image = frame.data()
    except:
        pass

Other image (FCCD*,Opal,PIM (TM6740), ... )

These all use the generic getFrameValue function:

...

Code Block
none
none
titlegetCsPadQuads
def event(self,evt,env):
    quads = evt.getCsPadQuads(self.img_source, env)
    if not quads :
        print '*** cspad information is missing ***'
        return
        
    # dump information about quadrants
    print "Number of quadrants: %d" % len(quads)
    
    for q in quads:
        print "  Quadrant %d" % q.quad()
        print "    virtual_channel: %s" % q.virtual_channel()
        print "    lane: %s" % q.lane()
        print "    tid: %s" % q.tid()
        print "    acq_count: %s" % q.acq_count()
        print "    op_code: %s" % q.op_code()
        print "    seq_count: %s" % q.seq_count()
        print "    ticks: %s" % q.ticks()
        print "    fiducials: %s" % q.fiducials()
        print "    frame_type: %s" % q.frame_type()
        print "    sb_temp: %s" % map(q.sb_temp, range(4))
            
        # image data as 3-dimentional array
        data = q.data()

data2 will give you the third section stored, but be aware that sections sometimes are missing,
and in this case So far so good. 'quads' is a list of CsPad Element objects, and not necessarily ordered in the expected way. So you'll need to check with the configuration information that you can obtain in beginjob:use q.quad() to obtain the quad number.
q.data() gives you a 3D numpy array [row][col][sec]. Here sections will be ordered as expected, but be aware in case of missing sections, that you may need to check the
configuration object. You can get that from the env object, typically something you do in beginjob:

Code Block
none
none

def beginjob(self,evt,env):
    config = 
Code Block
nonenone

def beginjob(self,evt,env):
    config = env.getConfig(xtc.TypeId.Type.Id_CspadConfig, self.img_source)
    if not config:
        print '*** cspad config object is missing ***'
        return
        
    quads = range(4)

    print 
    print "print "Cspad configuration"
    print "  N quadrants   : %d" % config.numQuads()
    print "  Quad mask     : %#x" % config.quadMask()
    print "  payloadSize   : %d" % config.payloadSize()
    print "  badAsicMask0  : %#x" % config.badAsicMask0()
    print "  badAsicMask1  : %#x" % config.badAsicMask1()
    print "  asicMask      : %#x" % config.asicMask()
    print "  numAsicsRead  : %d" % config.numAsicsRead()
    try.badAsicMask0()
    print "  badAsicMask1  : %#x" % config.badAsicMask1()
    print "  asicMask      : %#x" % config.asicMask()
    print "  numAsicsRead  : %d" % config.numAsicsRead()

   # get the indices of sections in use:
   qn = range(0,config.numQuads())               
   self.sections = map(config.sections, qn )        

If you want to draw the whole CsPad image, there's currently no pyana function that does this. Pyana only supplies the pixels in a numpy array, and the
exact location of each pixel depends on the conditions at the time of data collection. A simplified way of making the image can be seen in cspad_simple.py(newer version (cspad.py) available if you check out the XtcExplorer package).

CSPad pixel coordinates.

The CSPad detector image can be drawn by positioning the sections from the data array into a large image array. This is done in cspad_simple.py above. The positions are extracted from optical meterology measurements and additional calibrations. Alternatively one can find the coordinate of each individual pixel from a pixel map, based on the same optical metrology measurements. This is described in details here

Epics Process Variables and ControlConfig

EPICS data is different from DAQ event data. It stores the conditions and settings of the instruments, but values typically change more slowly than your
average shot-by-shot data, and EPICS data is typically updated only when it changes, or every second, or similar. It is not stored in the 'evt' (event) object,
but in the 'env' (environment) object. You typically would read it only at the beginning of each job or if your doing a scan, you'd read it in every calibration cycle:

Code Block
none
none
titleenv.epicsStore()

def begincalibcycle(self,evt,env):

    ## The returned value should be of the type epics.EpicsPvTime.
    pv = env.epicsStore().value( pv_name )
    if not pv:
        # older versions maylogging.warning('EPICS PV %s does not have all methodsexist', pv_name)
    else:
    print "  roiMask value = pv.value 
   : [%s]" % ', '.join([hex(config.roiMask(q)) for q in quads]) status = pv.status 
        alarm_severity = pv.severity 
Code Block
none
none
titleControlConfig

def begincalibcycle(self,evt,env):
    ctrl_config = env.getConfig(xtc.TypeId.Type.Id_ControlConfigprint "  numAsicsStored: %s" % str(map(config.numAsicsStored, quads))
    except:
    nControls = ctrl_config.npvControls()
  pass
  for ic in range print(0, "nControls ):

 sections      : %s"cpv % str(map(config.sections, quads))= ctrl_config.pvControl(ic)
    print
    name = cpv.name()
        value = cpv.value()