Page History
...
globalCollector: [6, 7, 8, 9, 10, 2, 3, 4, 5, 11]
Accumulator
An accumulator takes two functions as an argument, a reduction and a res_factory that is called on reset. It accumulates into self.res on each event in the following way: self.res = reduction(self.res, *args) at the end of a heartbeat workers and local collectors reset self.res by calling self.res_factory.
The accumulator is independent of the number of workers and local collectors, but consider the following two worker, two local collector example.
Code Block |
---|
from ami.graphkit_wrapper import Graph
import ami.graph_nodes as gn
def make_graph():
def reduction(res, *rest):
print(res, rest)
return res+rest[0]
def res_factory():
return 0
graph = Graph(name='graph')
graph.add(gn.Accumulator(name='accumulated', inputs=['delta_t'], outputs=['accumulated_delta_t'],
reduction=reduction, res_factor=res_factory))
graph.compile(num_workers=2, num_local_collectors=2)
return graph
graph = make_graph()
graph({'delta_t': 1}, color='worker')
# 0 (1,)
graph({'delta_t': 2}, color='worker')
# 1 (2, )
graph({'delta_t': 3}, color='worker')
# 3 (3, )
graph({'delta_t': 4}, color='worker')
# 6 (4, )
worker1 = graph({'delta_t': 5}, color='worker')
# 10 (5, )
print("worker1 done")
graph2 = make_graph()
for i in range(6, 11):
worker2 = graph2({'delta_t': i}, color='worker')
print("worker2 done")
graph3 = make_graph()
localCollector1 = graph3(worker1, color='localCollector')
# 0 (15, )
print("localCollector1 done")
graph4 = make_graph()
localCollector2 = graph4(worker2, color='localCollector')
# 0 (40, )
print("localCollector2 done")
graph5 = make_graph()
globalCollector = graph5(localCollector1, color='globalCollector')
# 0 (15, )
print("globalCollector")
globalCollector = graph5(localCollector2, color='globalCollector')
# 15 (40, )
print("globalCollector")
print(globalCollector)
# {'accumulated_delta_t': 55} |
ReduceByKey
Overview
Content Tools