Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
from ami.graphkit_wrapper import Graph
import ami.graph_nodes as gn


def make_graph():                                                                                                      
    def reduction(res, *rest):                                                                                         
        print(res, rest)                                                                                               
        return res+rest[0]                                                                                             
                                                                                                                       
    def res_factory():                                                                                                 
        return 0                                                                                                       
                                                                                                                       
    graph = Graph(name='graph')                                                                                        
    graph.add(gn.Accumulator(name='accumulated', inputs=['delta_t'], outputs=['accumulated_delta_t'],                  
                             reduction=reduction, res_factorfactory=res_factory))                                             
    graph.compile(num_workers=2, num_local_collectors=2)                                                               
    return graph                                                                                                       


graph = make_graph()
graph({'delta_t': 1}, color='worker')
# 0 (1,)
graph({'delta_t': 2}, color='worker')
# 1 (2, )
graph({'delta_t': 3}, color='worker')
# 3 (3, )
graph({'delta_t': 4}, color='worker')
# 6 (4, )
worker1 = graph({'delta_t': 5}, color='worker')
# 10 (5, )
print("worker1 done")

graph2 = make_graph()
for i in range(6, 11):                                                                                                 
    worker2 = graph2({'delta_t': i}, color='worker')                                                                   
print("worker2 done")

graph3 = make_graph()
localCollector1 = graph3(worker1, color='localCollector')
# 0 (15, )
print("localCollector1 done")

graph4 = make_graph()
localCollector2 = graph4(worker2, color='localCollector')
# 0 (40, )
print("localCollector2 done")

graph5 = make_graph()
globalCollector = graph5(localCollector1, color='globalCollector')
# 0 (15, )
print("globalCollector")

globalCollector = graph5(localCollector2, color='globalCollector')
# 15 (40, )
print("globalCollector")
print(globalCollector)
# {'accumulated_delta_t': 55}

...

https://github.com/slac-lcls/ami/blob/master/ami/flowchart/library/Numpy.py#L506-L528

ReduceByKey

ReduceByKey takes a key and value and reduction and reduces into a dictionary in the following way: if key in self.res: self.res[key] = reduction(self.res[key], value) else: self.res[key] = value

Similar to Accumulator, it is independent of the number of workers and local collectors and resets at the end of a heartbeat on workers and local collectors. To illustrate consider the following example:

Code Block
from ami.graphkit_wrapper import Graph
import ami.graph_nodes as gn


def reduction(res, *rest):                                                                                             
    print(res, rest)                                                                                                   
    return res+rest[0]                                                                                                 

def res_factory():                                                                                                     
    return 0                                                                                                           

graph = Graph(name='graph')
graph.add(gn.ReduceByKey(name='reduced', inputs=['key', 'value'], outputs=['accumulated'],                             
                         reduction=reduction))
graph.compile(num_workers=2, num_local_collectors=2)

graph({'key': 1, 'value': 3}, color='worker') 
graph({'key': 2, 'value': 7}, color='worker')
worker = graph({'key': 1, 'value': 13}, color='worker') # 3 (13, )
print("worker:", worker)  # worker: {'accumulated_worker': {1: 16, 2: 7}}
graph(worker, color='localCollector')  
graph(worker, color='localCollector')  
localCollector = graph(worker, color='localCollector') 
print("localCollector:", localCollector)  # localCollector: {'accumulated_localCollector': {1: 48, 2: 21}}
graph(localCollector, color='globalCollector')
globalCollector = graph(localCollector, color='globalCollector')
print("globalCollector:", globalCollector)  # globalCollector: {'accumulated': {1: 96, 2: 42}}

For more complicated examples look at MeanVsScan and MeanWaveformVsScan in the Operators section of AMI:

https://github.com/slac-lcls/ami/blob/master/ami/flowchart/library/Operators.py#L102-L120

https://github.com/slac-lcls/ami/blob/master/ami/flowchart/library/Operators.py#L149-L184