Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Publicly accessible practice data is located in S3DF in the directory /sdf/data/lcls/ds/prj/public01/xtc.  Use of this data requires the additional "dir=" keyword to the DataSource object.

...

Code Block
(ps-4.1.0) psanagpu101:lcls2$ detnames exp=tmoc00118,run=222,dir=/cdssdf/data/psdmlcls/ds/prj/public01/xtc
---------------------
Name      | Data Type
---------------------
epicsinfo | epicsinfo
timing    | raw      
hsd       | raw      
gmdstr0   | raw    
etc.  

...

Code Block
(ps-4.5.10) psanagpu101:~$ detnames -e exp=tmoc00118,run=222,dir=/cdssdf/data/lcls/psdmds/prj/public01/xtc | more
-----------------------------------------------------------------
Detector Name             | Epics Name                           
-----------------------------------------------------------------
StaleFlags                |                                      
Keithley_Sum              | EM2K0:XGMD:HPS:KeithleySum           
IM2K4_XrayPower           | IM2K4:PPM:SPM:VOLT_RBV               
IM3K4_XrayPower           | IM3K4:PPM:SPM:VOLT_RBV               
IM4K4_XrayPower           | IM4K4:PPM:SPM:VOLT_RBV               
etc.     

...

Code Block
languagepy
from psana import DataSource
ds = DataSource(exp='tmoc00118', run=222, dir='/cdssdf/data/psdmlcls/ds/prj/public01/xtc'\
, max_events=100)
myrun = next(ds.runs())
opal = myrun.Detector('tmo_atmopal')
epics_det = myrun.Detector('IM2K4_XrayPower')
for evt in myrun.events():
    img = opal.raw.image(evt)
    epics_val = epics_det(evt) # epics variables have a different syntax
    # check for missing data                                                    
    if img is None:
        print('no image')
    else:
        print(img.shape)
    if epics_val is None:
        print('no epics value')
    else:
        print(epics_val)

...

Code Block
languagepy
from psana import DataSource
import numpy as np
import os

# OPTIONAL callback with "gathered" small data from all cores.
# usually used for creating realtime plots when analyzing from
# DAQ shared memory. Called back on each SRV node.
def my_smalldata(data_dict):
    print(data_dict)

# sets the number of h5 files to write. 1 is sufficient for 120Hz operation
# optional: only needed if you are saving h5.
os.environ['PS_SRV_NODES']='1'

ds = DataSource(exp='tmoc00118', run=222, dir='/cdssdf/data/psdmlcls/ds/prj/public01/xtc', max_events=10)
# batch_size is optional. specifies how often the dictionary of small
# user data is gathered. if you write out large data (NOT RECOMMENDED) it needs to be set small.
smd = ds.smalldata(filename='mysmallh5.h5', batch_size=5, callbacks=[my_smalldata])

for run in ds.runs():
    opal = run.Detector('tmo_opal1')
    ebeam = run.Detector('ebeam')

    runsum  = np.zeros((3),dtype=float) # beware of datatypes when summing: can overflow
    for evt in run.events():
        img = opal.raw.image(evt)
        photonEnergy = ebeam.raw.ebeamPhotonEnergy(evt)
        # important: always check for missing data
        if img is None or photonEnergy is None: continue
        evtsum = np.sum(img)
        # pass either dictionary or kwargs
        smd.event(evt, evtsum=evtsum, photonEnergy=photonEnergy)
        runsum += img[0,:3] # local sum on one mpi core
 
    # optional summary data for whole run
    if smd.summary:
        tot_runsum = smd.sum(runsum) # sum (or max/min) across all mpi cores. Must be numpy array or None.
        # pass either dictionary or kwargs.
        smd.save_summary({'sum_over_run' : tot_runsum}, summary_int=1)
    smd.done()

...

Code Block
languagepy
from psana import DataSource
import numpy as np
import os

# OPTIONAL callback with "gathered" small data from all cores.
# usually used for creating realtime plots when analyzing from
# DAQ shared memory. Called back on each SRV node.
def my_smalldata(data_dict):
    print(data_dict)  

# Use this function to decide if you want to fetch large data for this event  
# and/or direct an event to process on a particular 'rank' 
# (this rank number should be between 1 and total no. of ranks - 3 
# since 3 ranks are reserved). If this detector is needed, make sure 
# to define this detector in as_smds argument for DataSource (see below).
# All epics and scan detectors are available automatically.
def smd_callback(run):
    opal = run.Detector('tmo_opal1')
    epics_det = run.Detector('IM2K4_XrayPower')

    n_bd_nodes = 3 # for mpirun -n 6, 3 ranks are reserved so there are 3 bd ranks left

    for i_evt, evt in enumerate(run.events()):
        img = opal.raw.image(evt)
        epics_val = epics_det(evt)
        dest = (evt.timestamp % n_bd_nodes) + 1

        if epics_val is not None:
            # Set the destination (rank no.) where this event should be sent to
            evt.set_destination(dest)
            yield evt

# sets the number of h5 files to write. 1 is sufficient for 120Hz operation
# optional: only needed if you are saving h5.
os.environ['PS_SRV_NODES']='1'

ds = DataSource(exp='tmoc00118', run=222, dir='/cdssdf/data/psdmlcls/ds/prj/public01/xtc',
        max_events  = 10,
        detectors   = ['epicsinfo', 'tmo_opal1', 'ebeam'],  # only reads these detectors (faster)
        smd_callback= smd_callback,                         # smalldata callback (see notes above)
        small_xtc   = ['tmo_opal1'],                        # detectors to be used in smalldata callback
        )

# batch_size is optional. specifies how often the dictionary of small
# user data is gathered.  if you write out large data (NOT RECOMMENDED) it needs to be set small.
smd = ds.smalldata(filename='mysmallh5.h5', batch_size=5, callbacks=[my_smalldata])

for run in ds.runs():
    opal = run.Detector('tmo_opal1')
    ebeam = run.Detector('ebeam')

    runsum  = np.zeros((3),dtype=float) # beware of datatypes when summing: can overflow
    for evt in run.events():
        img = opal.raw.image(evt)
        photonEnergy = ebeam.raw.ebeamPhotonEnergy(evt)
        if img is None or photonEnergy is None: continue
        evtsum = np.sum(img)
        # pass either dictionary or kwargs
        smd.event(evt, evtsum=evtsum, photonEnergy=photonEnergy)
        runsum += img[0,:3] # local sum on one mpi core
 
    # optional summary data for whole run
    if smd.summary:
        tot_runsum = smd.sum(runsum) # sum (or max/min) across all mpi cores. Must be numpy array or None.
        # pass either dictionary or kwargs
        smd.save_summary({'sum_over_run' : tot_runsum}, summary_int=1)
    smd.done()

...

Code Block
languagepy
# Create a datasource and tell it to exclude two detectors
from psana import DataSource
ds = DataSource(exp='tmoc00118', run=222, dir='/cdssdf/data/psdmlcls/ds/prj/public01/xtc',
        xdetectors  = ['hsd'],      # all other detectors will be available
        max_events  = 10)
 
 
run = next(ds.runs())

# Create these detectors normally 
opal = run.Detector('tmo_opal1')
ebeam = run.Detector('ebeam')
for i, evt in enumerate(run.events()):
    img = opal.raw.image(evt)
    photonEnergy = ebeam.raw.ebeamPhotonEnergy(evt)
    print(f'got evt={i} ts={evt.timestamp} img={img.shape} {photonEnergy=}')

...

Code Block
from psana import DataSource
ds = DataSource(exp='tmoc00118',run=222,dir='/cdssdf/data/psdmlcls/ds/prj/public01/xtc')
myrun = next(ds.runs())
timing = myrun.Detector('timing')
for nevt,evt in enumerate(myrun.events()):
    allcodes = timing.raw.eventcodes(evt)
    # event code 15 fires at 1Hz, and this exp/run had 10Hz triggers            
    print('event code 15 present:',allcodes[15])
    if nevt>20: break

...

Code Block
(ps-4.3.2) psanagpu102:lcls2$ detnames -s exp=rixx43518,run=45,dir=/cdssdf/data/lcls/psdmds/prj/public01/xtc
--------------------------
Name           | Data Type
--------------------------
motor1         | raw      
motor2         | raw      
step_value     | raw      
step_docstring | raw      
--------------------------
(ps-4.3.2) psanagpu102:lcls2$  

...

Code Block
from psana import DataSource
ds = DataSource(exp='rixx43518',run=45,dir='/cdssdf/data/lcls/psdmds/prj/public01/xtc')
myrun = next(ds.runs())
motor1 = myrun.Detector('motor1')
motor2 = myrun.Detector('motor2')
step_value = myrun.Detector('step_value')
step_docstring = myrun.Detector('step_docstring')
for step in myrun.steps():
    print(motor1(step),motor2(step),step_value(step),step_docstring(step))
    for evt in step.events():
        pass

...

Code Block
languagepy
titlelivemode.py
# Use environment variable to specify how many attempts,
# the datasource should wait for file reading (1 second wait).
# In this example, we set it to 30 (wait up 30 seconds).
import os
os.environ['PS_SMD_MAX_RETRIES'] = '30'


# Create a datasource with live flag
from psana import DataSource
ds = DataSource(exp='tmoc00118', run=222, dir='/cdssdf/data/psdmlcls/ds/prj/public01/xtc', 
        live        = True,
        max_events  = 10)


# Looping over your run and events as usual
# You'll see "wait for an event..." message in case
# The file system writing is slower than your analysis
run = next(ds.runs())
for i, evt in enumerate(run.events()):
    print(f'got evt={i} ts={evt.timestamp}')

...

Code Block
languagepy
titletest_jump.py
from psana import DataSource
import numpy as np
timestamps = np.array([4194783241933859761,4194783249723600225,4194783254218190609,4194783258712780993], dtype=np.uint64)
ds = DataSource(exp='tmoc00118', run=222, dir='/cdssdf/data/psdmlcls/ds/prj/public01/xtc',
        timestamps=timestamps)
myrun = next(ds.runs())
opal = myrun.Detector('tmo_atmopal')
for nevt, evt in enumerate(myrun.events()):
    img = opal.raw.image(evt)
    print(nevt, evt.timestamp, img.shape)

...

Code Block
languagepy
import datetime as dt
from psana import DataSource
ds = DataSource(exp='tmoc00118', run=222, dir='/cdssdf/data/psdmlcls/ds/prj/public01/xtc')
myrun = next(ds.runs())
for nevt,evt in enumerate(myrun.events()):
    if nevt>3: break
    t = evt.datetime()
    localt = t.replace(tzinfo=dt.timezone.utc).astimezone(tz=None)    
    print(localt.strftime('%H:%M:%S'))

...

Code Block
languagepy
titleintg_det.py
# The PS_SMD_N_EVENTS should be set to a small number (e.g. 1)
# since all other events which are part of this intg. event will be sent
# in the same batch.

import os
os.environ['PS_SMD_N_EVENTS'] = '1'
batch_size = 1

from psana import DataSource
ds = DataSource(exp='xpptut15', run=1, dir='/cdssdf/data/psdmlcls/ds/prj/public01/xtc/intg_det',
        intg_det='andor',         
        batch_size=batch_size)
run = next(ds.runs())
hsd = run.Detector('hsd')
andor = run.Detector('andor')

# Test calculating sum of the hsd for each integrating event.
sum_hsd = 0
for i_evt, evt in enumerate(run.events()):
    hsd_calib = hsd.raw.calib(evt)
    andor_calib = andor.raw.calib(evt)

    # Keep summing the value of the other detector (hsd in this case)
    sum_hsd += np.sum(hsd_calib[:])/np.prod(hsd_calib.shape)
    
    # When an integrating event is found, print out and reset the sum variable
    if andor_calib is not None:
        val_andor = np.sum(andor_calib[:])/np.prod(andor_calib.shape)
        print(f'i_evt: {i_evt} andor: {val_andor} sum_hsd:{sum_hsd}')
        sum_hsd = 0

...

Code Block
languagepy
from psana import DataSource
from psmon import publish
from psmon.plots import Image,XYPlot
import os
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

# The PS_SMD_N_EVENTS should be set to a small number (e.g. 1)
# since all other events which are part of this intg. event will be sent
# in the same batch.
os.environ['PS_SMD_N_EVENTS'] = '1'
os.environ['PS_SRV_NODES']='1'

if rank==4: # hack for now to eliminate use of publish.local below
    publish.init()

# we will remove this for batch processing and use "psplot" instead
# publish.local = True

def my_smalldata(data_dict):
    if 'unaligned_andor_norm' in data_dict:
        andor_norm = data_dict['unaligned_andor_norm'][0]
        myplot = XYPlot(0,"Andor (normalized)",range(len(andor_norm)),andor_norm
)
        publish.send('ANDOR',myplot)

ds = DataSource(exp='rixx1003821',run=46,dir='/cdssdf/data/psdmlcls/ds/prj/public01/xtc',intg_det='andor_vls',batch_size=1)
for myrun in ds.runs():
    andor = myrun.Detector('andor_vls')
    timing = myrun.Detector('timing')
    smd = ds.smalldata(filename='mysmallh5.h5',batch_size=1000, callbacks=[my_smalldata])
    norm = 0
    ndrop_inhibit = 0
    for nstep,step in enumerate(myrun.steps()):
        print('step:',nstep)
        for nevt,evt in enumerate(step.events()):
            andor_img = andor.raw.value(evt)
            # also need to check for events missing due to damage
            # (or compare against expected number of events)
            ndrop_inhibit += timing.raw.inhibitCounts(evt)
            smd.event(evt, mydata=nevt) # high rate data saved to h5
            # need to check Matt's new timing-system data on every
            # event to make sure we haven't missed normalization
            # data due to deadtime
            norm+=nevt # fake normalization
            if andor_img is not None:
                print('andor data on evt:',nevt,'ndrop_inhibit:',ndrop_inhibit)
                # check that the high-read readout group (2) didn't
                # miss any events due to deadtime
                if ndrop_inhibit[2]!=0: print('*** data lost due to deadtime')
                # need to prefix the name with "unaligned_" so
                # the low-rate andor dataset doesn't get padded
                # to align with the high rate datasets
                smd.event(evt, mydata=nevt,
                          unaligned_andor_norm=(andor_img/norm))
                norm=0
                ndrop_inhibit=0

...