Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We use 396 cores on 9 srcf nodes (32 eventbuilder cores) to keep up with all the eight xtc2 files. Below shows python and bash/job scripts for slurm.


Code Block
languagepy
titletest_live.py
import time
import os,sys
from psana import DataSource
import numpy as np
import vals
from mpi4py import MPI
comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

def test_standard():
    batch_size = 1000
    max_events = 0

    hutch='tst'
    exp=sys.argv[1]
    runno=int(sys.argv[2])

    xtc_dir=f'/cds/data/drpsrcf/{hutch}/{exp}/xtc/'

    ds = DataSource(exp=exp,
                    run=runno,
                    batch_size=batch_size,
                    max_events=max_events,
                    dir=xtc_dir,
                    live=True
                    )

    sendbuf = np.zeros(1, dtype='i')
    recvbuf = None
    if rank == 0:
        recvbuf = np.empty([size, 1], dtype='i')

    st = time.time()
    for run in ds.runs():
        for nevt, evt in enumerate(run.events()):
            if nevt % 1000 == 0 and nevt > 0:
                en = time.time()
                print(f'RANK: {rank:4d} EVENTS: {nevt:10d} RATE: {(1000/(en-st))*1e-3:.2f}kHz', flush=True)
                st = time.time()
            sendbuf += 1

    # Count total no. of events
    comm.Gather(sendbuf, recvbuf, root=0)
    if rank == 0:
        n_events = np.sum(recvbuf)
    else:
        n_events = None
    n_events = comm.bcast(n_events, root=0)
    return n_events


if __name__ == "__main__":
    comm.Barrier()
    t0 = MPI.Wtime()

    n_events = test_standard()

    comm.Barrier()
    t1 = MPI.Wtime()
    if rank == 0:
        n_eb_nodes = int(os.environ.get('PS_EB_NODES', '1'))
        print(f'TOTAL TIME:{t1-t0:.2f}s #EB: {n_eb_nodes:3d} EVENTS:{n_events:10d} RATE:{(n_events/(t1-t0))*1e-6:.2f}MHz', flush=True)


Code Block
languagebash
titlesubmit_slac.sh
#!/bin/bash
#SBATCH --partition=anaq
#SBATCH --job-name=psana2
#SBATCH --nodes=9
#SBATCH --ntasks=396
##SBATCH --ntasks-per-node=50
#SBATCH --output=%j.log
#SBATCH --exclusive


t_start=`date +%s`


source setup_hosts.sh
echo SLURM_HOSTFILE $SLURM_HOSTFILE SLURM_NTASKS $SLURM_NTASKS 


export PS_EB_NODES=32
MAX_EVENTS=0
EXP="tstx00817"
RUNNO=55
srun ./run_slac.sh $MAX_EVENTS $EXP $RUNNO


t_end=`date +%s`
echo PSJobCompleted TotalElapsed $((t_end-t_start))
Code Block
languagebash
titlerun_slac.sh
t_start=`date +%s`
echo "RUN PSANA2 SCRIPT SUBMITTED AT" $t_start 

# For psana2
export PS_R_MAX_RETRIES=60
export PS_SMD_N_EVENTS=10000
export PS_FAKESTEP_FLAG=0
export PS_SMD0_NUM_THREADS=32
export OMPI_MCA_btl_tcp_if_include=172.21.164.90/1072

MAX_EVENTS=${1}
EXP=${2}
RUNNO=${3}
python -u ${HOME}/psana-nersc/psana2/test_live.py $EXP $RUNNO

t_end=`date +%s`
echo "PSANA2 JOB COMPLETE AT" $t_end "TOTAL ELAPSED" $((t_end-t_start)) "N_TASKS" $SLURM_NTASKS

Run daq on drp-srcf-mon001 with below command:

Code Block
languagebash
procmgr start mona.cnf

Then submit a slurm job from one of drp-srcf nodes

Code Block
languagebash
sbatch submit_slac.sh



CNF File

Code Block
languagepy
titlemona.cnf
if not platform: platform = '7'

ld_lib_path = f'LD_LIBRARY_PATH={CONDA_PREFIX}/epics/lib/linux-x86_64:{CONDA_PREFIX}/pcas/lib/linux-x86_64'
epics_env = f'{ld_lib_path}'

collect_host = 'drp-srcf-mon001'

groups = platform
hutch, user, password = ('tst', 'tstopr', 'pcds')
auth = ' --user {:} --password {:} '.format(user,password)
url  = ' --url https://pswww.slac.stanford.edu/ws-auth/devlgbk/ '
cdb  = 'https://pswww.slac.stanford.edu/ws-auth/configdb/ws'

#
#  drp variables
#
prom_dir = f'/cds/group/psdm/psdatmgr/etc/config/prom/{hutch}' # Prometheus
data_dir = f'/cds/data/drpsrcf'
trig_dir = f'/cds/home/c/claus/lclsii/daq/runs/eb/data/srcf'

task_set = 'taskset -c 4-63'
batching = 'batching=yes'
directIO = 'directIO=yes'
scripts  = f'script_path={trig_dir}'
#network  = 'ep_provider=sockets,ep_domain=eno1'

std_opts = f'-P {hutch} -C {collect_host} -M {prom_dir}' # -k {network}'
std_opts0 = f'{std_opts} -d /dev/datadev_0 -o {data_dir} -k {batching},{directIO}'
std_opts1 = f'{std_opts} -d /dev/datadev_1 -o {data_dir} -k {batching},{directIO}'

teb_cmd  = f'{task_set} teb '+        std_opts + f' -k {scripts}'
meb_cmd  = f'{task_set} monReqServer {std_opts}'

drp_cmd0 = f'{task_set} drp '+   std_opts0
drp_cmd1 = f'{task_set} drp '+   std_opts1
pva_cmd0 = f'{task_set} drp_pva {std_opts0}'
pva_cmd1 = f'{task_set} drp_pva {std_opts1}'
bld_cmd0 = f'{task_set} drp_bld {std_opts0} -k interface=eno1'
bld_cmd1 = f'{task_set} drp_bld {std_opts1} -k interface=eno1'

elog_cfg = f'/cds/group/pcds/dist/pds/{hutch}/misc/elog_{hutch}.txt'

#ea_cfg = f'/cds/group/pcds/dist/pds/{hutch}/misc/epicsArch.txt'
#ea_cfg = f'/cds/group/pcds/dist/pds/tmo/misc/epicsArch_tmo.txt'
ea_cfg = f'/cds/group/pcds/dist/pds/rix/misc/epicsArch.txt'
ea_cmd0 = f'{task_set} epicsArch {std_opts0} {ea_cfg}'
ea_cmd1 = f'{task_set} epicsArch {std_opts1} {ea_cfg}'

#
#  ami variables
#
heartbeat_period = 1000 # units are ms

ami_workers_per_node = 4
ami_worker_nodes = ["drp-srcf-cmp019"]
ami_num_workers = len(ami_worker_nodes)
ami_manager_node = "drp-srcf-cmp019"
ami_monitor_node = "drp-srcf-cmp019"

# procmgr FLAGS: <port number> static port number to keep executable
#                              running across multiple start/stop commands.
#
# HOST       UNIQUEID      FLAGS  COMMAND+ARGS
# list of processes to run
#   required fields: id, cmd
#   optional fields: host, port, flags, conda, env, rtprio
#     flags:
#        'x' or 'X'  -> xterm: open small or large xterm for process console
#        's'         -> stop: sends ctrl-c to process
#        'u'         -> uniqueid: use 'id' as detector alias (supported by acq, cam, camedt, evr, and simcam)

base_host = 'drp-srcf-cmp004'  # drp-srcf-mon001

procmgr_config = [
# {                         id:'xpmpva' ,     flags:'s',   env:epics_env, cmd:f'xpmpva DAQ:NEH:XPM:0 DAQ:NEH:XPM:2 DAQ:NEH:XPM:4 DAQ:NEH:XPM:6'},
# {                         id:'groupca',     flags:'s',   env:epics_env, cmd:f'groupca DAQ:NEH 2 {groups}'},
# {                         id:'xpmpva' ,     flags:'s',   env:epics_env, cmd:f'xpmpva DAQ:NEH:XPM:0 DAQ:NEH:XPM:3 DAQ:NEH:XPM:5'},
 {                         id:'groupca',     flags:'s',   env:epics_env, cmd:f'groupca DAQ:NEH 3 {groups}'},
 {                         id:'procstat',    flags:'p',                  cmd:f'procstat {CONFIGDIR}/p{platform}.cnf.last'},

# set the phase2 transition timeout to 20s. this is because the teb
# has a 16s timeout for slow PVA detectors coming through the gateway.
# both can be reduced if/when we get consistent behavior from gateways.
# mona: -x sets XPM# (see confluence page eg. 3 is rix timing)
# {host: collect_host,      id:'control',     flags:'spu', env:epics_env, cmd:f'control -P {hutch} -B DAQ:NEH -x 2 -C BEAM {auth} {url} -d {cdb}/configDB -t trigger -S 1 -T 20000 -V {elog_cfg}'},
 {host: collect_host,      id:'control',     flags:'spu', env:epics_env, cmd:f'control -P {hutch} -B DAQ:NEH -x 3 -C BEAM {auth} {url} -d {cdb}/configDB -t trigger -S 0 -T 20000 -V {elog_cfg}'},
 {                         id:'control_gui', flags:'p',                  cmd:f'control_gui -H {collect_host} --uris {cdb} --expert {auth} --loglevel WARNING'},

 {host: 'drp-srcf-cmp019', id:'teb0',        flags:'spu',                cmd:f'{teb_cmd}'},

 {host: 'drp-srcf-cmp029', id:'timing_0',    flags:'spu', env:epics_env, cmd:f'{drp_cmd1} -l 0x1 -D ts'},
 {host: 'drp-srcf-cmp029', id:'tstcam5_0',   flags:'spu',                cmd:f'{drp_cmd1} -l 0x2 -D fakecam -k sim_length=145'},
 {host: 'drp-srcf-cmp029', id:'tstcam6_0',   flags:'spu',                cmd:f'{drp_cmd1} -l 0x4 -D fakecam -k sim_length=145'},
 {host: 'drp-srcf-cmp029', id:'tstcam7_0',   flags:'spu',                cmd:f'{drp_cmd1} -l 0x8 -D fakecam -k sim_length=145'},
 {host: 'drp-srcf-cmp029', id:'tstcam1_0',   flags:'spu',                cmd:f'{drp_cmd0} -l 0x1 -D fakecam -k sim_length=145'},
 {host: 'drp-srcf-cmp029', id:'tstcam2_0',   flags:'spu',                cmd:f'{drp_cmd0} -l 0x2 -D fakecam -k sim_length=145'},
 {host: 'drp-srcf-cmp029', id:'tstcam3_0',   flags:'spu',                cmd:f'{drp_cmd0} -l 0x4 -D fakecam -k sim_length=145'},
 {host: 'drp-srcf-cmp029', id:'tstcam4_0',   flags:'spu',                cmd:f'{drp_cmd0} -l 0x8 -D fakecam -k sim_length=145'},
#{host: 'drp-srcf-cmp010', id:'tstcam8_0',   flags:'spu',                cmd:f'{drp_cmd0} -l 0x1 -D fakecam -k sim_length=145'},
#{host: 'drp-srcf-cmp004', id:'txi_fim1_0',  flags:'spu', env:epics_env, cmd:drp_cmd0+' -l 0x10 -D wave8 -k epics_prefix=MR2K4:FIM:W8:01'},
]

#
# ami
#
procmgr_ami = [
 {host:ami_manager_node, id:'ami-global',  flags:'s', env:epics_env, cmd:f'ami-global --hutch {hutch} --prometheus-dir {prom_dir} -N 0 -n {ami_num_workers}'},
 {host:ami_manager_node, id:'ami-manager', flags:'s', cmd:f'ami-manager --hutch {hutch} --prometheus-dir {prom_dir} -n {ami_num_workers*ami_workers_per_node} -N {ami_num_workers}'},
 {                       id:'ami-client',  flags:'s', cmd:f'ami-client -H {ami_manager_node} --prometheus-dir {prom_dir} --hutch {hutch}'},
]

#
# ami workers
#
for N, worker_node in enumerate(ami_worker_nodes):
    procmgr_ami.append({host:worker_node, id:f'ami-meb{N}', flags:'spu',
                        cmd:f'{meb_cmd} -d -n 64 -q {ami_workers_per_node}'})
    procmgr_ami.append({host:worker_node, id:f'ami-node_{N}', flags:'s', env:epics_env,
                        cmd:f'ami-node --hutch {hutch} --prometheus-dir {prom_dir} -N {N} -n {ami_workers_per_node} -H {ami_manager_node} --log-level warning worker -b {heartbeat_period} psana://shmem={hutch}'})

#procmgr_config.extend(procmgr_ami)