Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

With the above data was recorded (in April 2023) and used to develop the detector interface was developed.  From this we see events like the following in the data files.  Both runs were taken with one panel per segment.  The first is from run 277 (psana://exp=tstx00417,run=277,dir=/cds/data/drpsrcf/tst/tstx00417/xtc) and contains one panel, and the second is from 276 (psana://exp=tstx00417,run=276,dir=/cds/data/drpsrcf/tst/tstx00417/xtc) containing 20 segments.

Image AddedImage Added

DrpPython

In the next step, the new DrpPython functionality was used to compress the raw data with libpressio.  We first tried the SZ algorithm and found that its performance didn't scale well with rate.  SZ3 worked better and we saw compression times of ~3.5 ms at a trigger rate of 5 kHz.  Calibrating the data to ready it for the compressor took 1.8 ms.  60 workers (60 python processes) were used to distribute the load and achieve the rate.  The following listing shows the DrpPython script used:

Code Block
languagepy
titleepixHrEmu.py
from psana import DataSource
from psana.dgramedit import AlgDef, DetectorDef, DataType
import psana.psexp.TransitionId
import sys
import numpy as np
from libpressio import PressioCompressor
import json

# Define compressor configuration:
lpjson = {
    "compressor_id": "sz3", #the compression algo.
    "compressor_config": {
        #"sz:data_type"           : lp.pressio_uint16_dtype,
        #"sz:data_type"           : np.dtype('uint16'),
        ###"sz:error_bound_mode_str" : "abs",
        ###"sz:abs_err_bound"        : 10, # max error
        "sz3:abs_error_bound"     : 10, # max error
        "sz3:metric"              : "size",
        #"pressio:nthreads"        : 4
    },
}

ds = DataSource(drp=drp_info, monitor=True)
thread_num = drp_info.worker_num

cfgAlg = AlgDef("config", 0, 0, 1)
fexAlg = AlgDef("fex", 0, 0, 1)
detDef = DetectorDef(drp_info.det_name, drp_info.det_type, drp_info.det_id)
cfgDef = {
    "compressor_json" : (str,      1),
}
fexDef = {
    "fex"             : (np.uint8, 1), # Why not float32?
}
nodeId = None
namesId = None

cfg = ds.add_detector(detDef, cfgAlg, cfgDef, nodeId, namesId, drp_info.det_segment)
det = ds.add_detector(detDef, fexAlg, fexDef, nodeId, namesId, drp_info.det_segment)

cfg.config.compressor_json = json.dumps(lpjson)

ds.add_data(cfg.config)

# configure
compressor = PressioCompressor.from_config(lpjson)
#print(compressor.get_config())

for myrun in ds.runs():
    epixhr = myrun.Detector('epixhr_emu')
    for nevt,evt in enumerate(myrun.events()):
        cal = epixhr.raw.calib(evt)
        det.fex.fex = compressor.encode(cal)
        ds.add_data(det.fex)
        if nevt%1000!=0: ds.remove_data('epixhr_emu','raw')

The times were measured by adding prometheus metrics to the script (not shown here for clarity) and viewing the results with grafana.  This performance was measured using one DRP segment and one panel.  A typical live AMI data image from such a run (segment 0 on drp-srcf-cmp035) is:

Image Added


Note that this shows data that has been decompressed by the detector interface.  The following is a snapshot of the grafana performance plot showing the calibration and compression times (1.94 ms, 3.55 ms) seen for a 5 kHz run.  The green trace showing 5.52 ms is the amount of time that is spent by the python script for each event from the C++ code's perspective.

Image Added