Page History
...
With the above data was recorded (in April 2023) and used to develop the detector interface was developed. From this we see events like the following in the data files. Both runs were taken with one panel per segment. The first is from run 277 (psana://exp=tstx00417,run=277,dir=/cds/data/drpsrcf/tst/tstx00417/xtc) and contains one panel, and the second is from 276 (psana://exp=tstx00417,run=276,dir=/cds/data/drpsrcf/tst/tstx00417/xtc) containing 20 segments.
DrpPython
In the next step, the new DrpPython functionality was used to compress the raw data with libpressio. We first tried the SZ algorithm and found that its performance didn't scale well with rate. SZ3 worked better and we saw compression times of ~3.5 ms at a trigger rate of 5 kHz. Calibrating the data to ready it for the compressor took 1.8 ms. 60 workers (60 python processes) were used to distribute the load and achieve the rate. The following listing shows the DrpPython script used:
Code Block | ||||
---|---|---|---|---|
| ||||
from psana import DataSource
from psana.dgramedit import AlgDef, DetectorDef, DataType
import psana.psexp.TransitionId
import sys
import numpy as np
from libpressio import PressioCompressor
import json
# Define compressor configuration:
lpjson = {
"compressor_id": "sz3", #the compression algo.
"compressor_config": {
#"sz:data_type" : lp.pressio_uint16_dtype,
#"sz:data_type" : np.dtype('uint16'),
###"sz:error_bound_mode_str" : "abs",
###"sz:abs_err_bound" : 10, # max error
"sz3:abs_error_bound" : 10, # max error
"sz3:metric" : "size",
#"pressio:nthreads" : 4
},
}
ds = DataSource(drp=drp_info, monitor=True)
thread_num = drp_info.worker_num
cfgAlg = AlgDef("config", 0, 0, 1)
fexAlg = AlgDef("fex", 0, 0, 1)
detDef = DetectorDef(drp_info.det_name, drp_info.det_type, drp_info.det_id)
cfgDef = {
"compressor_json" : (str, 1),
}
fexDef = {
"fex" : (np.uint8, 1), # Why not float32?
}
nodeId = None
namesId = None
cfg = ds.add_detector(detDef, cfgAlg, cfgDef, nodeId, namesId, drp_info.det_segment)
det = ds.add_detector(detDef, fexAlg, fexDef, nodeId, namesId, drp_info.det_segment)
cfg.config.compressor_json = json.dumps(lpjson)
ds.add_data(cfg.config)
# configure
compressor = PressioCompressor.from_config(lpjson)
#print(compressor.get_config())
for myrun in ds.runs():
epixhr = myrun.Detector('epixhr_emu')
for nevt,evt in enumerate(myrun.events()):
cal = epixhr.raw.calib(evt)
det.fex.fex = compressor.encode(cal)
ds.add_data(det.fex)
if nevt%1000!=0: ds.remove_data('epixhr_emu','raw')
|
The times were measured by adding prometheus metrics to the script (not shown here for clarity) and viewing the results with grafana. This performance was measured using one DRP segment and one panel. A typical live AMI data image from such a run (segment 0 on drp-srcf-cmp035) is:
Note that this shows data that has been decompressed by the detector interface. The following is a snapshot of the grafana performance plot showing the calibration and compression times (1.94 ms, 3.55 ms) seen for a 5 kHz run. The green trace showing 5.52 ms is the amount of time that is spent by the python script for each event from the C++ code's perspective.