...
This page holds a few example code-snippets for use in pyana analysis. The analysis is written in python and uses MatPlotLib.PyPlot for plotting of data. Compare with myana user examples to see how (some of) the same things can be done using the myana analysis framework.
BeamLine Data: EBeam
To read out energy, charge and position of the beam from the beamline data, use getEBeam()
. It returns a class/structure that has the following members/fields:
The most reliable place for up-to-date information about all the event getters in pyana, see: https://confluence.slac.stanford.edu/display/PCDS/Pyana+Reference+Manual#PyanaReferenceManual-Classpyana.event.Event
For all the examples, you may assume that the pyana module contains a class with at least 'beginjob', 'event' and 'endjob' functions that starts something like this:
Code Block |
---|
| none |
---|
| none |
---|
title | outline of a pyana module |
---|
|
import numpy as np
import matplotlib.pyplot as plt
from pypdsdata import xtc
class mypyana(object):
def __init__(self,source="") |
Code Block |
---|
none | none |
def event(self,evt,env):
ebeam = evt.getEBeam()
try :
beamChrgself.source = ebeam.fEbeamChargesource
beamEnrgself.counter = ebeam.fEbeamL3EnergyNone
beamPosXself.array = ebeam.fEbeamLTUPosX
[] # really just a list
beamPosY = ebeam.fEbeamLTUPosY def beginjob(self,evt,env):
beamAngXself.counter = ebeam.fEbeamLTUAngX 0
def event(self,evt,env):
beamAngYself.counter += ebeam.fEbeamLTUAngY1
beamPkCr# = ebeam.fEbeamPkCurrBC2
snippet code goes here
print "ebeam: ",thedata beamChrg, beamEnrg, beamPosX, beamPosY, beamAngX, beamAngY, beamPkCr
except= evt.get(xtc.TypeId.Type.Id_SomeType, self.source )
self.array.append( thedata.somevalue )
def endjob(self,evt,env):
print "No EBeam object found"
|
BeamLine Data: FEE Gas Detector
To read out the energy from the front end enclosure (FEE) gas detector, use getFeeGasDet()
. This returns and array of 4 numbers:
...
fee_energy_array = evt.getFeeGasDet()
gdENRC11 = fee_energy_array[0]
gdENRC12 = fee_energy_array[1]
gdENRC21 = fee_energy_array[2]
gdENRC22 = fee_energy_array[3]
energy = (gdENRC21 - gdENRC22) / 2
# or use the first two that has a different gain:
energy = (gdENRC11 - gdENRC12) / 2
Job done! Processed %d events. " % self.counter
# place for plotting etc
# convert from python list to a numpy array
self.array = np.array( self.array )
# plot graph
plt.plot(self.array)
|
BeamLine Data: EBeam
To read out energy, charge and position of the beam from the beamline data, use getEBeam()
. It returns a class/structure that has the following members/
BeamLine Data: Phase Cavity
To read out fit time and charge of the phase cavity, use getPhaseCavity()
which returns a structure with the following fields:
Code Block |
---|
|
def event(self,evt,env):
pcebeam = evt.getPhaseCavitygetEBeam()
try try:
beamChrg pcFitTime1 = pcebeam.fFitTime1fEbeamCharge
pcFitTime2beamEnrg = pcebeam.fFitTime2fEbeamL3Energy
pcCharge1beamPosX = pcebeam.fCharge1fEbeamLTUPosX
beamPosY pcCharge2 = pcebeam.fCharge2fEbeamLTUPosY
beamAngX print "PhaseCavity= ebeam.fEbeamLTUAngX
beamAngY = ebeam.fEbeamLTUAngY
beamPkCr = ebeam.fEbeamPkCurrBC2
print "ebeam: ", pcFitTime1beamChrg, beamEnrg, pcFitTime2beamPosX, pcCharge1beamPosY, pcCharge2
beamAngX, beamAngY, beamPkCr
except :
print "No PhaseEBeam Cavity object found"
|
...
BeamLine Data: FEE Gas Detector
To read out the energy from the front end enclosure (FEE) gas detector, use getFeeGasDet()
. This returns and array of 4 numbers:
Code Block |
---|
| none |
---|
| none |
---|
title | getFeeGasDet |
---|
|
def event(self,evt,env):
try: fee_energy_array = evt.getFeeGasDet()
gdENRC11 encoder = evt.get(xtc.TypeId.Type.Id_EncoderData, self.enc_source )
encoder_value = encoder.value()
except:= fee_energy_array[0]
gdENRC12 = fee_energy_array[1]
gdENRC21 = fee_energy_array[2]
gdENRC22 = fee_energy_array[3]
energy = (gdENRC21 + printgdENRC22) "No encoder found in this event"
return
|
You could combine it with phase cavity time, and compute a time delay from it, for example (I don't know the origin of these parameters!):
Code Block |
---|
none | none | / 2.0
# or use the first two that has a different gain:
#energy Encoder= Parameters(gdENRC11 to+ convertgdENRC12) / 2.0
|
BeamLine Data: Phase Cavity
To read out fit time and charge of the phase cavity, use getPhaseCavity()
which returns a structure with the following fields:
Code Block |
---|
| none |
---|
| none |
---|
title | getPhaseCavity |
---|
|
to picoseconds
delay_a pc = -80.0e-6;evt.getPhaseCavity()
delay_b = 0.52168; try:
delay_c = 299792458;
pcFitTime1 delay_0 = 0;
pc.fFitTime1
delay_time = (delay_a * encoder_valuepcFitTime2 + delay_b)*1.e-3 / delay_c)
= pc.fFitTime2
delay_time = 2pcCharge1 * delay_time / 1.0e-12 + delay_0 + pcFitTime1
|
Time data
The time of the event can be obtained within the event function:
Code Block |
---|
none | none |
def event ( self, evt, env ) := pc.fCharge1
pcCharge2 = pc.fCharge2
event_time = evt.getTime().seconds() + 1.0e-9*evt.getTime().nanoseconds() )
|
IPIMB diode data
This is data from sets of 4 diodes around the beam line (Intensity Position, Intensity Monitoring Boards)
that measures the beam intensity in four spots, from which we can also deduce the position of the beam.
Currently there are two data structures that holds data from the same type of devices. Depending on DAQ
configuration, they are either DetInfo type or BldInfo type. Here are examples for extracting both types
in the user module event function:
print "PhaseCavity: ", pcFitTime1, pcFitTime2, pcCharge1, pcCharge2
except :
print "No Phase Cavity object found"
|
Event code
Code Block |
---|
|
def event(self, evt, env):
evrdata = evt.getEvrData("NoDetector-0|Evr-0")
for i in range (evrdata.numFifoEvents()):
print "Event code: ", evrdata.fifoEvent(i).EventCode
|
In the example above, the address of the EvrData object is given as "NoDetector-0|Evr-0". The address may be different in other cases, so make sure you have the correct address. If you don't know what it is, you can use 'pyxtcreader -vv <xtcfile> | less' to browse your xtcfile and look for it. Look for a lines with 'contains=EvrConfig_V' or 'contains=EvrData_V'. The address will be found on the same line in 'src=DetInfo(<address>)'
Encoder data (delay scanner)
Code Block |
---|
|
def event(self,evt,env):
try:
encoder = evt.get(xtc.TypeId.Type.Id_EncoderData, self.enc_source )
encoder_value = encoder.value()
except:
print "No encoder found in this event"
return
|
You could combine it with phase cavity time, and compute a time delay from it, for example (I don't know the origin of these parameters!):
Code Block |
---|
|
# Encoder Parameters to convert to picoseconds
delay_a = -80.0e-6;
delay_b = 0.52168;
delay_c = 299792458;
delay_0 = 0;
delay_time = (delay_a * encoder_value + delay_b)*1.e-3 / delay_c)
delay_time = 2 * delay_time / 1.0e-12 + delay_0 + pcFitTime1
|
Time data
The time of the event can be obtained within the event function:
Code Block |
---|
|
def event ( self, evt, env ) :
event_time = evt.getTime().seconds() + 1.0e-9*evt.getTime().nanoseconds() )
|
IPIMB diode data
This is data from sets of 4 diodes around the beam line (Intensity Position, Intensity Monitoring Boards)
that measures the beam intensity in four spots, from which we can also deduce the position of the beam.
Currently there are two data structures that holds data from the same type of devices. Depending on DAQ
configuration, they are either DetInfo type or BldInfo type. Here are examples for extracting both types
in the user module event function:
Code Block |
---|
|
def event(self, evt, env):
# raw data
ipmRaw = evt.get(xtc.TypeId.Type.Id_IpimbData, source )
try:
ch = [ipmRaw.channel0(),
ipmRaw.channel1(),
ipmRaw.channel2(),
ipmRaw.channel3() ]
ch_volt = [ipmRaw.channel0Volts(),
ipmRaw.channel1Volts(),
ipmRaw.channel2Volts(),
ipmRaw.channel3Volts()]
except:
pass
# feature-extracted data
ipmFex = evt.get(xtc.TypeId.Type.Id_IpmFex, source )
try:
# array of 4 numbers
fex_channel = ipmFex.channel
# scalar values
fex_sum = ipmFex.sum
fex_xpos = ipmFex.xpos
fex_ypos = ipmFex.ypos
except:
pass
|
Code Block |
---|
|
def event(self, evt, env):
ipm = evt.getSharedIpimbValue("HFX-DG3-IMB-02")
# or equivalently:
# ipm = evt.get(xtc.TypeId.Type.Id_SharedIpimb, "HFX-DG3-IMB-02")
try:
### Raw data ###
# arrays of 4 numbers:
ch = [ipm.ipimbData.channel0(),
ipm.ipimbData.channel1(),
ipm.ipimbData.channel2(),
ipm.ipimbData.channel3()]
ch_volt = [ipm.ipimbData.channel0Volts(),
ipm.ipimbData.channel1Volts(),
ipm.ipimbData.channel2Volts(),
ipm.ipimbData.channel3Volts()]
### Feature-extracted data ###
# array of 4 numbers:
fex_channels = ipm.ipmFexData.channel
# scalars:
fex_sum = ipm.ipmFexData.sum
fex_xpos = ipm.ipmFexData.xpos
fex_ypos = ipm.ipmFexData.ypos
except:
pass
|
Acqiris waveform data
This method can be used for any detector/device that has Acqiris waveform data. Edit the self.address field to get the detector of your choice.
Initialize class members:
Code Block |
---|
|
def __init__ ( self ):
# initialize data
self.address = "AmoITof-0|Acqiris-0"
self.data = []
self.counter = 0
|
If you're curious to see any of the Acqiris configuration, e.g. how many channels do we have, you can inspect the AcqConfig object:
Code Block |
---|
|
def beginjob ( self, evt, env ) :
cfg = env.getConfig( _pdsdata.xtc.TypeId.Type.Id_AcqConfig, self.address ) |
Code Block |
---|
none | none |
title | DetInfo |
---|
def event(self, evt, env):
# raw data
ipmRaw = evt.get(xtc.TypeId.Type.Id_IpimbData, source )
try:
ch = [ipmRaw.channel0(),
ipmRaw.channel1(),
ipmRaw.channel2(),
ipmRaw.channel3() ]
ch_voltself.num = [ipmRawcfg.channel0VoltsnbrChannels(),
|
The read the event waveform data (an array) and append it to the self.data list:
Code Block |
---|
|
def event ( self, evt, env ) :
ipmRaw.channel1Volts(),
channel = 0
acqData = evt.getAcqValue( ipmRawself.channel2Volts()address,
channel, env)
if acqData :
ipmRaw.channel3Volts() ]
except:
self.counter+=1
pass
# feature-extracted data
ipmFexwf = evtacqData.get(xtc.TypeId.Type.Id_IpmFex, source )
try:waveform() # returns a waveform array of numpy.ndarray type.
# self.data.append(wf)
|
At the end of the job, take the average and plot it:
Code Block |
---|
|
array of 4 numbers
def endjob( self, env fex_channel = ipmFex.channel ) :
data # scalar values
fex_sum = ipmFex.sum
= np.array(self.data) # this is an array of shape (Nevents, nSamples)
# take the fex_xposmean = ipmFex.xpos
of all events for each sampling time
fex_ypos = ipmFex.ypos
xs = except:np.mean(data, axis=0)
pass
|
Code Block |
---|
none | none |
title | BldInfo |
---|
def event(self, evt, env):
plt.plot(xs)
ipm = evt.getSharedIpimbValue("HFX-DG3-IMB-02"plt.xlabel('Seconds')
# or equivalently: plt.ylabel('Volts')
# ipm = evt.get(xtc.TypeId.Type.Id_SharedIpimb, "HFX-DG3-IMB-02")
try:
### Raw data ### plt.show()
|
Which gives you a plot like this
Image Added
Princeton camera image
When plotting with MatPlotLib, we don't need to set the limits of the histogram manually, thus we don't need to read the Princeton configuration for this. If we want to sum the image from several events, we must first define and initialize some variables:
Code Block |
---|
|
def __init__ ( self ):
# arrays of 4 numbers:initialize data
chself.address = [ipm.ipimbData.channel0(), "SxrEndstation-0|Princeton-0"
self.data = None
|
In each event, we add the image array returned from the getPrincetonValue function:
Code Block |
---|
| none |
---|
| none |
---|
title | getPrincetonValue |
---|
|
def event ( ipm.ipimbData.channel1(),
self, evt, env ) :
frame = evt.getPrincetonValue( ipm.ipimbData.channel2(),self.address, env)
if ipm.ipimbData.channel3()]frame :
ch_volt = [ipm.ipimbData.channel0Volts(),
# accumulate the data
if self.data is ipm.ipimbData.channel1Volts(),None :
self.data = ipm.ipimbData.channel2Volts(),np.float_(frame.data())
else :
ipm.ipimbData.channel3Volts()]
### Feature-extracted data ###
self.data += frame.data()
|
At the end of the job, display/save the array:
Code Block |
---|
def endjob( self, env ) :
plt.imshow( self.data/self.countpass, # array of 4 numbers:
origin='lower')
plt.colorbar()
fex_channels = ipm.ipmFexData.channel plt.show()
# save the full image to a #png scalars:file
plt.imsave(fname="pyana_princ_image.png",arr=self.data, origin='lower')
|
Note that imsave saves the image only, pixel by pixel. If you want a view of the figure itself, lower resolution, you can save it from the interactive window you get from plt.show().
Image Added
PnCCD image
Code Block |
---|
| none |
---|
| none |
---|
title | getPnCcdValue |
---|
|
def event(self,evt,env):
try:
fex_sum = ipm.ipmFexData.sum
fex_xpos = ipm.ipmFexData.xpos
fex_ypos = ipm.ipmFexData.ypos
except:
frame pass
|
Acqiris waveform data
This method can be used for any detector/device that has Acqiris waveform data. Edit the self.address field to get the detector of your choice.
Initialize class members:
Code Block |
---|
def __init__ ( self ):
# initialize data= evt.getPnCcdValue( self.source, env )
image = frame.data()
except:
self.address = "AmoITof-0|Acqiris-0"
self.data = []pass
|
Other image (FCCD*,Opal,PIM (TM6740), ... )
These all use the generic getFrameValue function:
Code Block |
---|
| none |
---|
| none |
---|
title | getFrameValue |
---|
|
def event(self,evt,env):
try:
self.counterframe = 0
|
If you're curious to see any of the Acqiris configuration, e.g. how many channels do we have, you can inspect the AcqConfig object:
Code Block |
---|
def beginjob ( self, evt, env ) :evt.getFrameValue( self.source )
cfgimage = envframe.getConfigdata( _pdsdata.xtc.TypeId.Type.Id_AcqConfig, self.address ))
except:
self.num = cfg.nbrChannels()
|
The read the event waveform data (an array) and append it to the self.data list:
FCCD (Fast CCD) image
The Fast CCD is read out as two 8-bit images, therefore you need this extra line to convert it to the right format.
Code Block |
---|
| none |
---|
| none |
---|
title | getFrameValue |
---|
|
def event(self,evt,env):
try |
Code Block |
---|
def event ( self, evt, env ) :
channel = 0
acqData = evt.getAcqValue( self.address, channel, env)
if acqData :
frame = evt.getFrameValue( self.counter+=1source )
image wf = acqDataframe.waveformdata()
# except:
returns a waveform array of numpy.ndarray type. pass
# convert to 16-bit integer
image.dtype = selfnp.data.append(wf)
|
At the end of the job, take the average and plot it:
CsPad data
Here's an example of getting CsPad data from an event:
Code Block |
---|
| none |
---|
| none |
---|
title | getCsPadQuads |
---|
|
def event(self,evt,env): |
Code Block |
---|
def endjob( self, env ) :
quads data = npevt.arraygetCsPadQuads(self.data) # this is an array of shape (Nevents, nSamples)
img_source, env)
if not quads :
# take the mean of all events for each sampling time
print '*** cspad information is missing ***'
xs = np.mean(data, axis=0)
return
plt.plot(xs)
# dump information plt.xlabel('Seconds')about quadrants
print "Number of quadrants: plt.ylabel('Volts'%d" % len(quads)
plt.show()
|
Which gives you a plot like this
Image Removed
Display images from princeton camera
When plotting with MatPlotLib, we don't need to set the limits of the histogram manually, thus we don't need to read the Princeton configuration for this. If we want to sum the image from several events, we must first define and initialize some variables:
Code Block |
---|
def __init__ ( self ):
# initialize data
self.address = "SxrEndstation-0|Princeton-0"
for q in quads:
print " Quadrant %d" % q.quad()
print " virtual_channel: %s" % q.virtual_channel()
print " lane: %s" % q.lane()
self.data = None
|
In each event, we add the image array returned from the getPrincetonValue function:
Code Block |
---|
def event ( self, evt, env ) :
print " tid: %s" % q.tid()
frame = evt.getPrincetonValue( self.address, env)
if frame :
print " acq_count: %s" % q.acq_count()
print " # accumulate the data
op_code: %s" % q.op_code()
print " if self.data is None : seq_count: %s" % q.seq_count()
print " ticks: %s" self.data = np.float_(frame.data())% q.ticks()
print " else fiducials: %s" % q.fiducials()
print " frame_type: %s" % selfq.data += frame.dataframe_type()
|
At the end of the job, display/save the array:
Code Block |
---|
def endjob( self, env ) :
print " sb_temp: %s" % plt.imshow( self.data/self.countpass, origin='lower'map(q.sb_temp, range(4))
plt.colorbar()
plt.show()
# image data as # save the full image to a png file
plt.imsave(fname="pyana_princ_image.png",arr=self.data, origin='lower')
|
Note that imsave saves the image only, pixel by pixel. If you want a view of the figure itself, lower resolution, you can save it from the interactive window you get from plt.show().
Image Removed
CsPad data
Here's an example of getting CsPad data from an event:
3-dimentional array
data = q.data()
|
So far so good. 'quads' is a list of CsPad Element objects, and not necessarily ordered in the expected way. So you'll need to use q.quad() to obtain the quad number.
q.data() gives you a 3D numpy array [row][col][sec]. Here sections will be ordered as expected, but be aware in case of missing sections, that you may need to check the
configuration object. You can get that from the env object, typically something you do in beginjob:
Code Block |
---|
|
def beginjob(self,evt,env):
config = env.getConfig(xtc.TypeId.Type.Id_CspadConfig, self.img_source)
if not config:
print '*** cspad config object is missing ***'
|
Code Block |
---|
none | none |
def event(self,evt,env):
quads = evt.getCsPadQuads(self.img_source, env)
if not quads :
print '*** cspad information is missing ***'
return
# dump information about quadrants
print "Number of quadrants: %d" % len(quads)
for q in quads:
return print " Quadrant %d" % q.quad()
print "Cspad configuration"
print " N quadrants virtual_channel: %s%d" % qconfig.virtual_channelnumQuads()
print " Quad printmask " lane: %s%#x" % qconfig.lanequadMask()
print " payloadSize tid : %s%d" % qconfig.tidpayloadSize()
print " badAsicMask0 acq_count: %s%#x" % qconfig.acq_countbadAsicMask0()
print " badAsicMask1 op_code: %s%#x" % qconfig.op_codebadAsicMask1()
print " asicMask print " seq_count: %s%#x" % qconfig.seq_countasicMask()
print " numAsicsRead ticks: %s%d" % qconfig.ticksnumAsicsRead()
# get the indices of printsections "in use:
fiducials:qn %s" % q.fiducials()
= range(0,config.numQuads()) print " frame_type: %s" % q.frame_type()
self.sections = map(config.sections, qn ) print " sb_temp: %s" % map(q.sb_temp, range(4))
# image data as 3-dimentional array
data = q.data()
|
data2 will give you the third section stored, but be aware that sections sometimes are missing,
and in this case you'll need to check with the configuration information that you can obtain in beginjob:
If you want to draw the whole CsPad image, there's currently no pyana function that does this. Pyana only supplies the pixels in a numpy array, and the
exact location of each pixel depends on the conditions at the time of data collection. A simplified way of making the image can be seen in cspad_simple.py(newer version (cspad.py) available if you check out the XtcExplorer package).
CSPad pixel coordinates.
The CSPad detector image can be drawn by positioning the sections from the data array into a large image array. This is done in cspad_simple.py above. The positions are extracted from optical meterology measurements and additional calibrations. Alternatively one can find the coordinate of each individual pixel from a pixel map, based on the same optical metrology measurements. This is described in details here
Epics Process Variables and ControlConfig
EPICS data is different from DAQ event data. It stores the conditions and settings of the instruments, but values typically change more slowly than your
average shot-by-shot data, and EPICS data is typically updated only when it changes, or every second, or similar. It is not stored in the 'evt' (event) object,
but in the 'env' (environment) object. You typically would read it only at the beginning of each job or if your doing a scan, you'd read it in every calibration cycle:
Code Block |
---|
| none |
---|
| none |
---|
title | env.epicsStore() |
---|
|
def begincalibcycle(self,evt,env):
## The returned value should be of the type epics.EpicsPvTime.
pv = env.epicsStore().value( pv_name )
if not pv:
logging.warning('EPICS PV %s does not exist', pv_name)
else:
value = pv.value
status = pv.status |
Code Block |
---|
none | none |
def beginjob(self,evt,env):
config = env.getConfig(xtc.TypeId.Type.Id_CspadConfig, self.img_source)
if not config:
print '*** cspad config object is missing ***'
return
quads = range(4)
print
print "Cspad configuration"
print " N quadrants : %d" % config.numQuads()
print " Quad mask : %#x" % config.quadMask()
print " payloadSize : %d" % config.payloadSize()
print " badAsicMask0 : %#x" % config.badAsicMask0()
print " badAsicMask1 : %#x" % config.badAsicMask1()
print " asicMask : %#x" % config.asicMask()
print " numAsicsRead : %d" % config.numAsicsRead()
try:
# older versions may not have all methods
printalarm_severity "= roiMask pv.severity
|
Code Block |
---|
| none |
---|
| none |
---|
title | ControlConfig |
---|
|
def begincalibcycle(self,evt,env):
: [%s]" % ', '.join([hex(config.roiMask(q)) for q in quads])ctrl_config = env.getConfig(xtc.TypeId.Type.Id_ControlConfig)
nControls print " numAsicsStored: %s" % str(map(config.numAsicsStored, quads))= ctrl_config.npvControls()
except:
for ic in range (0, nControls ):
pass
print "cpv sections= ctrl_config.pvControl(ic)
: %s"name % str(map(config.sections, quads))
print
= cpv.name()
value = cpv.value()
|