Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

From Dan Damiani, on 12/2/20.  This is an alternative to the "mailbox" example here: https://mdavidsaver.github.io/p4p/server.html#running-a-pva-server.  You can view the variables on the command line like this:

(ps-4.1.0) psbuild-rhel7-01:lcls2$ pvget -m demo:pv:float
demo:pv:float 2020-12-18 18:09:46.464  7.6
demo:pv:float 2020-12-18 18:09:48.464  7.7
demo:pv:float 2020-12-18 18:09:50.467  7.8


Code Block
languagepy
import time
import threading
import numpy as np
from p4p.nt import NTEnum, NTScalar, NTNDArray
from p4p.server import Server, StaticProvider
from p4p.server.thread import SharedPV


class ExampleServer:
    def __init__(self, pv_info):
        self.provider = StaticProvider('example_provider')
        self.pvs = {}
        for name, nttype, init in pv_info:
            pv = SharedPV(nt=nttype, initial=init)
            self.provider.add('demo:pv:%s' % name, pv)
            self.pvs[name] = pv
        self.thread = threading.Thread(target=self.run, daemon=True)
        self.thread.start()

    def count_forever(self):
        count = 0
        while True:
            time.sleep(2.0)
            count += 1
            self.pvs['integer'].post(count)
            self.pvs['float'].post(count/10)
            self.pvs['boolean'].post(count % 2 == 0)
            self.pvs['floatnp'].post(np.full(10, count/10, dtype=np.float64))
            self.pvs['integernp'].post(np.full(10, count, dtype=np.int32))
            self.pvs['image'].post(np.full((10,10), count, dtype=np.uint16))

    def run(self):
        Server.forever(providers=[self.provider])

if __name__ == '__main__':
    """
    Code    Type
    ?   bool
    s   unicode
    b   s8
    B   u8
    h   s16
    H   u16
    i   i32
    I   u32
    l   i64
    L   u64
    f   f32
    d   f64
    v   variant
    U   union
    S   struct
    """
    pv_info = [
        ('float', NTScalar('d'), 0.0),
        ('integer', NTScalar('i'), '0'),
        ('boolean', NTScalar('?'), True),
        ('enum', NTEnum(), {'choices': ['apple', 'cart', 'horse'], 'index': 0}),
        # 1-d arrays
        ('floatnp', NTScalar('ad'), np.zeros(10, dtype=np.float64)),
        ('integernp', NTScalar('ai'), np.zeros(10, dtype=np.int32)),
        # 2-d arrays (a.k.a. images)
        ('image', NTNDArray(), np.zeros((10,10), dtype=np.uint16)),
    ]

    server = ExampleServer(pv_info)

    try:
        server.count_forever()
    except KeyboardInterrupt:
        pass

...