asynchronous python itertools chain multiple generators












0















UPDATED QUESTION FOR CLARITY:



suppose I have 2 processing generator functions:



def gen1(): # just for examples,
yield 1 # yields actually carry
yield 2 # different computation weight
yield 3 # in my case

def gen2():
yield 4
yield 5
yield 6


I can chain them with itertools



from itertools import chain

mix = chain(gen1(), gen2())


and then I can create another generator function object with it,



def mix_yield():
for item in mix:
yield item


or simply if I just want to next(mix), it's there.



My question is, how can I do the equivalent in asynchronous code?



Because I need it to:




  • return in yield (one by one), or with next iterator

  • the fastest resolved yield first (async)


PREV. UPDATE:



After experimenting and researching, I found aiostream library which states as async version of itertools, so what I did:



import asyncio
from aiostream import stream

async def gen1():
await asyncio.sleep(0)
yield 1
await asyncio.sleep(0)
yield 2
await asyncio.sleep(0)
yield 3

async def gen2():
await asyncio.sleep(0)
yield 4
await asyncio.sleep(0)
yield 5
await asyncio.sleep(0)
yield 6

a_mix = stream.combine.merge(gen1(),gen2())

async def a_mix_yield():
for item in a_mix:
yield item


but I still can't do next(a_mix)



TypeError: 'merge' object is not an iterator


or next(await a_mix)



raise StreamEmpty()


Although I still can make it into a list:



print(await stream.list(a_mix))
# [1, 2, 4, 3, 5, 6]


so one goal is completed, one more to go:





  • return in yield (one by one), or with next iterator



    - the fastest resolved yield first (async)












share|improve this question

























  • Your code above is just creating a couple generators and iterating through them. Hence why you are seeing them printed in order. You could iterate through gen2 first and it would print 4,5,6,1,2,3. Perhaps you should find a different example to show what you're trying to do.

    – user2263572
    Nov 22 '18 at 14:37











  • In my case gen1() and gen2() yields not at the same time, I will update my question and I think already found the answer with aiostream (I hope).

    – Ardhi
    Nov 22 '18 at 15:00











  • Sorry people for the confusion, I updated the question for clarity.

    – Ardhi
    Nov 22 '18 at 17:49
















0















UPDATED QUESTION FOR CLARITY:



suppose I have 2 processing generator functions:



def gen1(): # just for examples,
yield 1 # yields actually carry
yield 2 # different computation weight
yield 3 # in my case

def gen2():
yield 4
yield 5
yield 6


I can chain them with itertools



from itertools import chain

mix = chain(gen1(), gen2())


and then I can create another generator function object with it,



def mix_yield():
for item in mix:
yield item


or simply if I just want to next(mix), it's there.



My question is, how can I do the equivalent in asynchronous code?



Because I need it to:




  • return in yield (one by one), or with next iterator

  • the fastest resolved yield first (async)


PREV. UPDATE:



After experimenting and researching, I found aiostream library which states as async version of itertools, so what I did:



import asyncio
from aiostream import stream

async def gen1():
await asyncio.sleep(0)
yield 1
await asyncio.sleep(0)
yield 2
await asyncio.sleep(0)
yield 3

async def gen2():
await asyncio.sleep(0)
yield 4
await asyncio.sleep(0)
yield 5
await asyncio.sleep(0)
yield 6

a_mix = stream.combine.merge(gen1(),gen2())

async def a_mix_yield():
for item in a_mix:
yield item


but I still can't do next(a_mix)



TypeError: 'merge' object is not an iterator


or next(await a_mix)



raise StreamEmpty()


Although I still can make it into a list:



print(await stream.list(a_mix))
# [1, 2, 4, 3, 5, 6]


so one goal is completed, one more to go:





  • return in yield (one by one), or with next iterator



    - the fastest resolved yield first (async)












share|improve this question

























  • Your code above is just creating a couple generators and iterating through them. Hence why you are seeing them printed in order. You could iterate through gen2 first and it would print 4,5,6,1,2,3. Perhaps you should find a different example to show what you're trying to do.

    – user2263572
    Nov 22 '18 at 14:37











  • In my case gen1() and gen2() yields not at the same time, I will update my question and I think already found the answer with aiostream (I hope).

    – Ardhi
    Nov 22 '18 at 15:00











  • Sorry people for the confusion, I updated the question for clarity.

    – Ardhi
    Nov 22 '18 at 17:49














0












0








0








UPDATED QUESTION FOR CLARITY:



suppose I have 2 processing generator functions:



def gen1(): # just for examples,
yield 1 # yields actually carry
yield 2 # different computation weight
yield 3 # in my case

def gen2():
yield 4
yield 5
yield 6


I can chain them with itertools



from itertools import chain

mix = chain(gen1(), gen2())


and then I can create another generator function object with it,



def mix_yield():
for item in mix:
yield item


or simply if I just want to next(mix), it's there.



My question is, how can I do the equivalent in asynchronous code?



Because I need it to:




  • return in yield (one by one), or with next iterator

  • the fastest resolved yield first (async)


PREV. UPDATE:



After experimenting and researching, I found aiostream library which states as async version of itertools, so what I did:



import asyncio
from aiostream import stream

async def gen1():
await asyncio.sleep(0)
yield 1
await asyncio.sleep(0)
yield 2
await asyncio.sleep(0)
yield 3

async def gen2():
await asyncio.sleep(0)
yield 4
await asyncio.sleep(0)
yield 5
await asyncio.sleep(0)
yield 6

a_mix = stream.combine.merge(gen1(),gen2())

async def a_mix_yield():
for item in a_mix:
yield item


but I still can't do next(a_mix)



TypeError: 'merge' object is not an iterator


or next(await a_mix)



raise StreamEmpty()


Although I still can make it into a list:



print(await stream.list(a_mix))
# [1, 2, 4, 3, 5, 6]


so one goal is completed, one more to go:





  • return in yield (one by one), or with next iterator



    - the fastest resolved yield first (async)












share|improve this question
















UPDATED QUESTION FOR CLARITY:



suppose I have 2 processing generator functions:



def gen1(): # just for examples,
yield 1 # yields actually carry
yield 2 # different computation weight
yield 3 # in my case

def gen2():
yield 4
yield 5
yield 6


I can chain them with itertools



from itertools import chain

mix = chain(gen1(), gen2())


and then I can create another generator function object with it,



def mix_yield():
for item in mix:
yield item


or simply if I just want to next(mix), it's there.



My question is, how can I do the equivalent in asynchronous code?



Because I need it to:




  • return in yield (one by one), or with next iterator

  • the fastest resolved yield first (async)


PREV. UPDATE:



After experimenting and researching, I found aiostream library which states as async version of itertools, so what I did:



import asyncio
from aiostream import stream

async def gen1():
await asyncio.sleep(0)
yield 1
await asyncio.sleep(0)
yield 2
await asyncio.sleep(0)
yield 3

async def gen2():
await asyncio.sleep(0)
yield 4
await asyncio.sleep(0)
yield 5
await asyncio.sleep(0)
yield 6

a_mix = stream.combine.merge(gen1(),gen2())

async def a_mix_yield():
for item in a_mix:
yield item


but I still can't do next(a_mix)



TypeError: 'merge' object is not an iterator


or next(await a_mix)



raise StreamEmpty()


Although I still can make it into a list:



print(await stream.list(a_mix))
# [1, 2, 4, 3, 5, 6]


so one goal is completed, one more to go:





  • return in yield (one by one), or with next iterator



    - the fastest resolved yield first (async)









python python-3.x asynchronous python-asyncio sequence-generators






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Nov 22 '18 at 17:59







Ardhi

















asked Nov 22 '18 at 1:55









ArdhiArdhi

744713




744713













  • Your code above is just creating a couple generators and iterating through them. Hence why you are seeing them printed in order. You could iterate through gen2 first and it would print 4,5,6,1,2,3. Perhaps you should find a different example to show what you're trying to do.

    – user2263572
    Nov 22 '18 at 14:37











  • In my case gen1() and gen2() yields not at the same time, I will update my question and I think already found the answer with aiostream (I hope).

    – Ardhi
    Nov 22 '18 at 15:00











  • Sorry people for the confusion, I updated the question for clarity.

    – Ardhi
    Nov 22 '18 at 17:49



















  • Your code above is just creating a couple generators and iterating through them. Hence why you are seeing them printed in order. You could iterate through gen2 first and it would print 4,5,6,1,2,3. Perhaps you should find a different example to show what you're trying to do.

    – user2263572
    Nov 22 '18 at 14:37











  • In my case gen1() and gen2() yields not at the same time, I will update my question and I think already found the answer with aiostream (I hope).

    – Ardhi
    Nov 22 '18 at 15:00











  • Sorry people for the confusion, I updated the question for clarity.

    – Ardhi
    Nov 22 '18 at 17:49

















Your code above is just creating a couple generators and iterating through them. Hence why you are seeing them printed in order. You could iterate through gen2 first and it would print 4,5,6,1,2,3. Perhaps you should find a different example to show what you're trying to do.

– user2263572
Nov 22 '18 at 14:37





Your code above is just creating a couple generators and iterating through them. Hence why you are seeing them printed in order. You could iterate through gen2 first and it would print 4,5,6,1,2,3. Perhaps you should find a different example to show what you're trying to do.

– user2263572
Nov 22 '18 at 14:37













In my case gen1() and gen2() yields not at the same time, I will update my question and I think already found the answer with aiostream (I hope).

– Ardhi
Nov 22 '18 at 15:00





In my case gen1() and gen2() yields not at the same time, I will update my question and I think already found the answer with aiostream (I hope).

– Ardhi
Nov 22 '18 at 15:00













Sorry people for the confusion, I updated the question for clarity.

– Ardhi
Nov 22 '18 at 17:49





Sorry people for the confusion, I updated the question for clarity.

– Ardhi
Nov 22 '18 at 17:49












1 Answer
1






active

oldest

votes


















3














The async equivalent of next is the __anext__ method on the async iterator. The iterator is in turn obtained by calling __aiter__ (in analogy to __iter__) on an iterable. Unrolled async iteration looks like this:



a_iterator = obj.__aiter__()          # regular method
elem1 = await a_iterator.__anext__() # async method
elem2 = await a_iterator.__anext__() # async method
...


The __anext__ method will raise StopAsyncIteration when no more elements are available. To iterate over async iterators, you should use async for rather than for.



Here is a runnable example, based on your code, using both __anext__ and async for to exhaust the stream set up with aiostream.stream.combine.merge:



async def main():
a_mix = stream.combine.merge(gen1(), gen2())
async with a_mix.stream() as streamer:
mix_iter = streamer.__aiter__()
print(await mix_iter.__anext__())
print(await mix_iter.__anext__())
print('remaining:')
async for x in mix_iter:
print(x)

asyncio.get_event_loop().run_until_complete(main())





share|improve this answer


























  • Thank you so much, I have to re-read it multiple times and connect the dots before completely understand.

    – Ardhi
    Nov 23 '18 at 2:49











  • @Ardhi Another good resource is the PEP that introduced them.

    – user4815162342
    Nov 23 '18 at 6:57








  • 1





    Just mentioning that entering a streaming context in aiostream is meant to be done using async with zs.stream() as streamer:, as seen in this demonstration.

    – Vincent
    Nov 23 '18 at 9:02













  • thank you @Vincent, good catch.

    – Ardhi
    Nov 23 '18 at 14:37











  • @Vincent Thanks, I've now amended the answer to use the advertised pattern.

    – user4815162342
    Nov 23 '18 at 23:05












Your Answer






StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");

StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});

function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});


}
});














draft saved

draft discarded


















StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53422850%2fasynchronous-python-itertools-chain-multiple-generators%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown

























1 Answer
1






active

oldest

votes








1 Answer
1






active

oldest

votes









active

oldest

votes






active

oldest

votes









3














The async equivalent of next is the __anext__ method on the async iterator. The iterator is in turn obtained by calling __aiter__ (in analogy to __iter__) on an iterable. Unrolled async iteration looks like this:



a_iterator = obj.__aiter__()          # regular method
elem1 = await a_iterator.__anext__() # async method
elem2 = await a_iterator.__anext__() # async method
...


The __anext__ method will raise StopAsyncIteration when no more elements are available. To iterate over async iterators, you should use async for rather than for.



Here is a runnable example, based on your code, using both __anext__ and async for to exhaust the stream set up with aiostream.stream.combine.merge:



async def main():
a_mix = stream.combine.merge(gen1(), gen2())
async with a_mix.stream() as streamer:
mix_iter = streamer.__aiter__()
print(await mix_iter.__anext__())
print(await mix_iter.__anext__())
print('remaining:')
async for x in mix_iter:
print(x)

asyncio.get_event_loop().run_until_complete(main())





share|improve this answer


























  • Thank you so much, I have to re-read it multiple times and connect the dots before completely understand.

    – Ardhi
    Nov 23 '18 at 2:49











  • @Ardhi Another good resource is the PEP that introduced them.

    – user4815162342
    Nov 23 '18 at 6:57








  • 1





    Just mentioning that entering a streaming context in aiostream is meant to be done using async with zs.stream() as streamer:, as seen in this demonstration.

    – Vincent
    Nov 23 '18 at 9:02













  • thank you @Vincent, good catch.

    – Ardhi
    Nov 23 '18 at 14:37











  • @Vincent Thanks, I've now amended the answer to use the advertised pattern.

    – user4815162342
    Nov 23 '18 at 23:05
















3














The async equivalent of next is the __anext__ method on the async iterator. The iterator is in turn obtained by calling __aiter__ (in analogy to __iter__) on an iterable. Unrolled async iteration looks like this:



a_iterator = obj.__aiter__()          # regular method
elem1 = await a_iterator.__anext__() # async method
elem2 = await a_iterator.__anext__() # async method
...


The __anext__ method will raise StopAsyncIteration when no more elements are available. To iterate over async iterators, you should use async for rather than for.



Here is a runnable example, based on your code, using both __anext__ and async for to exhaust the stream set up with aiostream.stream.combine.merge:



async def main():
a_mix = stream.combine.merge(gen1(), gen2())
async with a_mix.stream() as streamer:
mix_iter = streamer.__aiter__()
print(await mix_iter.__anext__())
print(await mix_iter.__anext__())
print('remaining:')
async for x in mix_iter:
print(x)

asyncio.get_event_loop().run_until_complete(main())





share|improve this answer


























  • Thank you so much, I have to re-read it multiple times and connect the dots before completely understand.

    – Ardhi
    Nov 23 '18 at 2:49











  • @Ardhi Another good resource is the PEP that introduced them.

    – user4815162342
    Nov 23 '18 at 6:57








  • 1





    Just mentioning that entering a streaming context in aiostream is meant to be done using async with zs.stream() as streamer:, as seen in this demonstration.

    – Vincent
    Nov 23 '18 at 9:02













  • thank you @Vincent, good catch.

    – Ardhi
    Nov 23 '18 at 14:37











  • @Vincent Thanks, I've now amended the answer to use the advertised pattern.

    – user4815162342
    Nov 23 '18 at 23:05














3












3








3







The async equivalent of next is the __anext__ method on the async iterator. The iterator is in turn obtained by calling __aiter__ (in analogy to __iter__) on an iterable. Unrolled async iteration looks like this:



a_iterator = obj.__aiter__()          # regular method
elem1 = await a_iterator.__anext__() # async method
elem2 = await a_iterator.__anext__() # async method
...


The __anext__ method will raise StopAsyncIteration when no more elements are available. To iterate over async iterators, you should use async for rather than for.



Here is a runnable example, based on your code, using both __anext__ and async for to exhaust the stream set up with aiostream.stream.combine.merge:



async def main():
a_mix = stream.combine.merge(gen1(), gen2())
async with a_mix.stream() as streamer:
mix_iter = streamer.__aiter__()
print(await mix_iter.__anext__())
print(await mix_iter.__anext__())
print('remaining:')
async for x in mix_iter:
print(x)

asyncio.get_event_loop().run_until_complete(main())





share|improve this answer















The async equivalent of next is the __anext__ method on the async iterator. The iterator is in turn obtained by calling __aiter__ (in analogy to __iter__) on an iterable. Unrolled async iteration looks like this:



a_iterator = obj.__aiter__()          # regular method
elem1 = await a_iterator.__anext__() # async method
elem2 = await a_iterator.__anext__() # async method
...


The __anext__ method will raise StopAsyncIteration when no more elements are available. To iterate over async iterators, you should use async for rather than for.



Here is a runnable example, based on your code, using both __anext__ and async for to exhaust the stream set up with aiostream.stream.combine.merge:



async def main():
a_mix = stream.combine.merge(gen1(), gen2())
async with a_mix.stream() as streamer:
mix_iter = streamer.__aiter__()
print(await mix_iter.__anext__())
print(await mix_iter.__anext__())
print('remaining:')
async for x in mix_iter:
print(x)

asyncio.get_event_loop().run_until_complete(main())






share|improve this answer














share|improve this answer



share|improve this answer








edited Nov 23 '18 at 23:03

























answered Nov 22 '18 at 18:29









user4815162342user4815162342

64.1k594151




64.1k594151













  • Thank you so much, I have to re-read it multiple times and connect the dots before completely understand.

    – Ardhi
    Nov 23 '18 at 2:49











  • @Ardhi Another good resource is the PEP that introduced them.

    – user4815162342
    Nov 23 '18 at 6:57








  • 1





    Just mentioning that entering a streaming context in aiostream is meant to be done using async with zs.stream() as streamer:, as seen in this demonstration.

    – Vincent
    Nov 23 '18 at 9:02













  • thank you @Vincent, good catch.

    – Ardhi
    Nov 23 '18 at 14:37











  • @Vincent Thanks, I've now amended the answer to use the advertised pattern.

    – user4815162342
    Nov 23 '18 at 23:05



















  • Thank you so much, I have to re-read it multiple times and connect the dots before completely understand.

    – Ardhi
    Nov 23 '18 at 2:49











  • @Ardhi Another good resource is the PEP that introduced them.

    – user4815162342
    Nov 23 '18 at 6:57








  • 1





    Just mentioning that entering a streaming context in aiostream is meant to be done using async with zs.stream() as streamer:, as seen in this demonstration.

    – Vincent
    Nov 23 '18 at 9:02













  • thank you @Vincent, good catch.

    – Ardhi
    Nov 23 '18 at 14:37











  • @Vincent Thanks, I've now amended the answer to use the advertised pattern.

    – user4815162342
    Nov 23 '18 at 23:05

















Thank you so much, I have to re-read it multiple times and connect the dots before completely understand.

– Ardhi
Nov 23 '18 at 2:49





Thank you so much, I have to re-read it multiple times and connect the dots before completely understand.

– Ardhi
Nov 23 '18 at 2:49













@Ardhi Another good resource is the PEP that introduced them.

– user4815162342
Nov 23 '18 at 6:57







@Ardhi Another good resource is the PEP that introduced them.

– user4815162342
Nov 23 '18 at 6:57






1




1





Just mentioning that entering a streaming context in aiostream is meant to be done using async with zs.stream() as streamer:, as seen in this demonstration.

– Vincent
Nov 23 '18 at 9:02







Just mentioning that entering a streaming context in aiostream is meant to be done using async with zs.stream() as streamer:, as seen in this demonstration.

– Vincent
Nov 23 '18 at 9:02















thank you @Vincent, good catch.

– Ardhi
Nov 23 '18 at 14:37





thank you @Vincent, good catch.

– Ardhi
Nov 23 '18 at 14:37













@Vincent Thanks, I've now amended the answer to use the advertised pattern.

– user4815162342
Nov 23 '18 at 23:05





@Vincent Thanks, I've now amended the answer to use the advertised pattern.

– user4815162342
Nov 23 '18 at 23:05




















draft saved

draft discarded




















































Thanks for contributing an answer to Stack Overflow!


  • Please be sure to answer the question. Provide details and share your research!

But avoid



  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.


To learn more, see our tips on writing great answers.




draft saved


draft discarded














StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53422850%2fasynchronous-python-itertools-chain-multiple-generators%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







Popular posts from this blog

Biblatex bibliography style without URLs when DOI exists (in Overleaf with Zotero bibliography)

ComboBox Display Member on multiple fields

Is it possible to collect Nectar points via Trainline?