...
For all the examples, you may assume that the pyana module contains a class with at least 'beginjob', 'event' and 'endjob' functions that starts something like this:
Code Block |
---|
| none |
---|
| none |
---|
title | outline of a pyana modulenone |
---|
|
import numpy as np
import matplotlib.pyplot as plt
from pypdsdata import xtc
class mypyana(object):
def __init__(self,source=""):
self.source = source
self.counter = None
self.array = [] # really just a list
def beginjob(self,evt,env):
self.counter = 0
def event(self,evt,env):
self.counter += 1
# snippet code goes here
thedata = evt.get(xtc.TypeId.Type.Id_SomeType, self.source )
self.array.append( thedata.somevalue )
def endjob(self,evt,env):
print "Job done! Processed %d events. " % self.counter
# place for plotting etc
# convert from python list to a numpy array
self.array = np.array( self.array )
# plot graph
plt.plot(self.array)
|
...
To read out energy, charge and position of the beam from the beamline data, use getEBeam()
. It returns a class/structure that has the following members/fields:
Code Block |
---|
| none |
---|
| none |
---|
title | getEBeamnone |
---|
|
def event(self,evt,env):
ebeam = evt.getEBeam()
try :
beamChrg = ebeam.fEbeamCharge
beamEnrg = ebeam.fEbeamL3Energy
beamPosX = ebeam.fEbeamLTUPosX
beamPosY = ebeam.fEbeamLTUPosY
beamAngX = ebeam.fEbeamLTUAngX
beamAngY = ebeam.fEbeamLTUAngY
beamPkCr = ebeam.fEbeamPkCurrBC2
print "ebeam: ", beamChrg, beamEnrg, beamPosX, beamPosY, beamAngX, beamAngY, beamPkCr
except:
print "No EBeam object found"
|
...
To read out the energy from the front end enclosure (FEE) gas detector, use getFeeGasDet()
. This returns and array of 4 numbers:
Code Block |
---|
| none |
---|
| none |
---|
title | getFeeGasDetnone |
---|
|
fee_energy_array = evt.getFeeGasDet()
gdENRC11 = fee_energy_array[0]
gdENRC12 = fee_energy_array[1]
gdENRC21 = fee_energy_array[2]
gdENRC22 = fee_energy_array[3]
energy = (gdENRC21 + gdENRC22) / 2.0
# or use the first two that has a different gain:
energy = (gdENRC11 + gdENRC12) / 2.0
|
...
To read out fit time and charge of the phase cavity, use getPhaseCavity()
which returns a structure with the following fields:
Code Block |
---|
| none |
---|
| none |
---|
title | getPhaseCavitynone |
---|
|
pc = evt.getPhaseCavity()
try:
pcFitTime1 = pc.fFitTime1
pcFitTime2 = pc.fFitTime2
pcCharge1 = pc.fCharge1
pcCharge2 = pc.fCharge2
print "PhaseCavity: ", pcFitTime1, pcFitTime2, pcCharge1, pcCharge2
except :
print "No Phase Cavity object found"
|
Event code
Code Block |
---|
|
def event(self, evt, env):
evrdata = evt.getEvrData("NoDetector-0|Evr-0")
for i in range (evrdata.numFifoEvents()):
print "Event code: ", evrdata.fifoEvent(i).EventCode
|
...
Encoder data (delay scanner)
Code Block |
---|
| none |
---|
| none |
---|
title | EncoderDatanone |
---|
|
def event(self,evt,env):
try:
encoder = evt.get(xtc.TypeId.Type.Id_EncoderData, self.enc_source )
encoder_value = encoder.value()
except:
print "No encoder found in this event"
return
|
...
The time of the event can be obtained within the event function:
Code Block |
---|
|
def event ( self, evt, env ) :
event_time = evt.getTime().seconds() + 1.0e-9*evt.getTime().nanoseconds() )
|
...
Currently there are two data structures that holds data from the same type of devices. Depending on DAQ
configuration, they are either DetInfo type or BldInfo type. Here are examples for extracting both types
in the user module event function:
Code Block |
---|
|
def event(self, evt, env):
# raw data
ipmRaw = evt.get(xtc.TypeId.Type.Id_IpimbData, source )
try:
ch = [ipmRaw.channel0(),
ipmRaw.channel1(),
ipmRaw.channel2(),
ipmRaw.channel3() ]
ch_volt = [ipmRaw.channel0Volts(),
ipmRaw.channel1Volts(),
ipmRaw.channel2Volts(),
ipmRaw.channel3Volts()]
except:
pass
# feature-extracted data
ipmFex = evt.get(xtc.TypeId.Type.Id_IpmFex, source )
try:
# array of 4 numbers
fex_channel = ipmFex.channel
# scalar values
fex_sum = ipmFex.sum
fex_xpos = ipmFex.xpos
fex_ypos = ipmFex.ypos
except:
pass
|
Code Block |
---|
|
def event(self, evt, env):
ipm = evt.getSharedIpimbValue("HFX-DG3-IMB-02")
# or equivalently:
# ipm = evt.get(xtc.TypeId.Type.Id_SharedIpimb, "HFX-DG3-IMB-02")
try:
### Raw data ###
# arrays of 4 numbers:
ch = [ipm.ipimbData.channel0(),
ipm.ipimbData.channel1(),
ipm.ipimbData.channel2(),
ipm.ipimbData.channel3()]
ch_volt = [ipm.ipimbData.channel0Volts(),
ipm.ipimbData.channel1Volts(),
ipm.ipimbData.channel2Volts(),
ipm.ipimbData.channel3Volts()]
### Feature-extracted data ###
# array of 4 numbers:
fex_channels = ipm.ipmFexData.channel
# scalars:
fex_sum = ipm.ipmFexData.sum
fex_xpos = ipm.ipmFexData.xpos
fex_ypos = ipm.ipmFexData.ypos
except:
pass
|
...
In each event, we add the image array returned from the getPrincetonValue function:
Code Block |
---|
| none |
---|
| none |
---|
title | getPrincetonValuenone |
---|
|
def event ( self, evt, env ) :
frame = evt.getPrincetonValue( self.address, env)
if frame :
# accumulate the data
if self.data is None :
self.data = np.float_(frame.data())
else :
self.data += frame.data()
|
...
Note that imsave saves the image only, pixel by pixel. If you want a view of the figure itself, lower resolution, you can save it from the interactive window you get from plt.show().
![](/download/thumbnails/96708667/pyana_princ_image.gif?version=1&modificationDate=1291916324000&api=v2)
PnCCD image
Code Block |
---|
| none |
---|
| none |
---|
title | getPnCcdValuenone |
---|
|
def event(self,evt,env):
try:
frame = evt.getPnCcdValue( self.source, env )
image = frame.data()
except:
pass
|
...
These all use the generic getFrameValue function:
Code Block |
---|
| none |
---|
| none |
---|
title | getFrameValuenone |
---|
|
def event(self,evt,env):
try:
frame = evt.getFrameValue( self.source )
image = frame.data()
except:
pass
|
...
The Fast CCD is read out as two 8-bit images, therefore you need this extra line to convert it to the right format.
Code Block |
---|
| none |
---|
| none |
---|
title | getFrameValuenone |
---|
|
def event(self,evt,env):
try:
frame = evt.getFrameValue( self.source )
image = frame.data()
except:
pass
# convert to 16-bit integer
image.dtype = np.uint16
|
...
Here's an example of getting CsPad data from an event:
Code Block |
---|
| none |
---|
| none |
---|
title | getCsPadQuadsnone |
---|
|
def event(self,evt,env):
quads = evt.getCsPadQuads(self.img_source, env)
if not quads :
print '*** cspad information is missing ***'
return
# dump information about quadrants
print "Number of quadrants: %d" % len(quads)
for q in quads:
print " Quadrant %d" % q.quad()
print " virtual_channel: %s" % q.virtual_channel()
print " lane: %s" % q.lane()
print " tid: %s" % q.tid()
print " acq_count: %s" % q.acq_count()
print " op_code: %s" % q.op_code()
print " seq_count: %s" % q.seq_count()
print " ticks: %s" % q.ticks()
print " fiducials: %s" % q.fiducials()
print " frame_type: %s" % q.frame_type()
print " sb_temp: %s" % map(q.sb_temp, range(4))
# image data as 3-dimentional array
data = q.data()
|
...
EPICS data is different from DAQ event data. It stores the conditions and settings of the instruments, but values typically change more slowly than your
average shot-by-shot data, and EPICS data is typically updated only when it changes, or every second, or similar. It is not stored in the 'evt' (event) object,
but in the 'env' (environment) object. You typically would read it only at the beginning of each job or if your doing a scan, you'd read it in every calibration cycle:
Code Block |
---|
| none |
---|
| none |
---|
title | env.epicsStore()none |
---|
|
def begincalibcycle(self,evt,env):
## The returned value should be of the type epics.EpicsPvTime.
pv = env.epicsStore().value( pv_name )
if not pv:
logging.warning('EPICS PV %s does not exist', pv_name)
else:
value = pv.value
status = pv.status
alarm_severity = pv.severity
|
Code Block |
---|
| none |
---|
| none |
---|
title | ControlConfignone |
---|
|
def begincalibcycle(self,evt,env):
ctrl_config = env.getConfig(xtc.TypeId.Type.Id_ControlConfig)
nControls = ctrl_config.npvControls()
for ic in range (0, nControls ):
cpv = ctrl_config.pvControl(ic)
name = cpv.name()
value = cpv.value()
|