python - RxPy with_latest_from producing inconsistent results -
i'm using group_by
on observable
each newly created group, want snap element (with new key) caused group created using with_latest_from
:
>>> __future__ import print_function >>> rx import observable >>> # sequence 1, 2, 3, ... every half second >>> observable=observable.interval(500).map(lambda x: x + 1) >>> # groups numbers divisible 3 (true) , not (false) >>> grouped = observable.group_by(lambda x: bool(x%3)) >>> # groups paired first element kicked off group >>> grouped.with_latest_from(observable, lambda group, element: (group, element)).subscribe(print)
i'm expecting see both of below printed, seeing either 1 each time.
(<rx.linq.groupedobservable.groupedobservable object @ 0xabc>, 1) # 1 element created group key=false (<rx.linq.groupedobservable.groupedobservable object @ 0xdef>, 3) # 3 element created group key=true
on odd occasion see snapped element 2:
(<rx.linq.groupedobservable.groupedobservable object @ 0x0313eb10>, 2)
any ideas going wrong?
from dag brattli:
the problem seems using with_latest_from() on stream itself. there no guaranties if source or latest trigger first. if latest triggers before source new group, lost. way solve separate stream of first elements in each group , zip stream of groups:
# stream first element if each group firsts = grouped.flat_map(lambda group: group.first()) # groups paired first element kicked off group grouped.zip(firsts, lambda g, k: (g, k)).subscribe(print)
also note each group contains key result of modulus operator if can used instead of element:
grouped.map(lambda gr: (gr, gr.key)).subscribe(print)
Comments
Post a Comment