From Dan Damiani, on 12/2/20. This is an alternative to the "mailbox" example here: https://mdavidsaver.github.io/p4p/server.html#running-a-pva-server. You can view the variables on the command line like this:
|
import time import threading import numpy as np from p4p.nt import NTEnum, NTScalar, NTNDArray from p4p.server import Server, StaticProvider from p4p.server.thread import SharedPV class ExampleServer: def __init__(self, pv_info): self.provider = StaticProvider('example_provider') self.pvs = {} for name, nttype, init in pv_info: pv = SharedPV(nt=nttype, initial=init) self.provider.add('demo:pv:%s' % name, pv) self.pvs[name] = pv self.thread = threading.Thread(target=self.run, daemon=True) self.thread.start() def count_forever(self): count = 0 while True: time.sleep(2.0) count += 1 self.pvs['integer'].post(count) self.pvs['float'].post(count/10) self.pvs['boolean'].post(count % 2 == 0) self.pvs['floatnp'].post(np.full(10, count/10, dtype=np.float64)) self.pvs['integernp'].post(np.full(10, count, dtype=np.int32)) self.pvs['image'].post(np.full((10,10), count, dtype=np.uint16)) def run(self): Server.forever(providers=[self.provider]) if __name__ == '__main__': """ Code Type ? bool s unicode b s8 B u8 h s16 H u16 i i32 I u32 l i64 L u64 f f32 d f64 v variant U union S struct """ pv_info = [ ('float', NTScalar('d'), 0.0), ('integer', NTScalar('i'), '0'), ('boolean', NTScalar('?'), True), ('enum', NTEnum(), {'choices': ['apple', 'cart', 'horse'], 'index': 0}), # 1-d arrays ('floatnp', NTScalar('ad'), np.zeros(10, dtype=np.float64)), ('integernp', NTScalar('ai'), np.zeros(10, dtype=np.int32)), # 2-d arrays (a.k.a. images) ('image', NTNDArray(), np.zeros((10,10), dtype=np.uint16)), ] server = ExampleServer(pv_info) try: server.count_forever() except KeyboardInterrupt: pass
Overview
Content Tools