Kotlin coroutines: How to use Array channel with filter / map?
up vote
1
down vote
favorite
I need to insert a buffer of 10000 elements between the various channel processors.
produce()
provides a way to configure the buffer size:
produce(capacity = 10_000) {
}
However map
, filter
default to a Rendezvous channel:
fun <E, R> ReceiveChannel<E>.map(context: CoroutineContext = Unconfined, transform: suspend (E) -> R): ReceiveChannel<R> =
produce(context) { // No capacity specified, defaults to 0
consumeEach {
send(transform(it))
}
}
Is there a way to configure this? Currently I'm constructing my own versions of these stdlib functions with a buffer, which isn't very elegant.
kotlin kotlinx.coroutines
add a comment |
up vote
1
down vote
favorite
I need to insert a buffer of 10000 elements between the various channel processors.
produce()
provides a way to configure the buffer size:
produce(capacity = 10_000) {
}
However map
, filter
default to a Rendezvous channel:
fun <E, R> ReceiveChannel<E>.map(context: CoroutineContext = Unconfined, transform: suspend (E) -> R): ReceiveChannel<R> =
produce(context) { // No capacity specified, defaults to 0
consumeEach {
send(transform(it))
}
}
Is there a way to configure this? Currently I'm constructing my own versions of these stdlib functions with a buffer, which isn't very elegant.
kotlin kotlinx.coroutines
If you use a dispatcher other thanUnconfined
, I wonder howmap
would even work with a rendezvous channel.
– Marko Topolnik
Nov 13 at 14:12
@MarkoTopolnik why do you think it will not work?
– qwwdfsad
Nov 16 at 9:51
@qwwdfsad Later on I realized it would, but awkwardly. The sending coroutine would remain suspended until the receiving coroutine gets and processes the item.
– Marko Topolnik
Nov 16 at 9:52
1
Just to clarify: sending coroutine isGlobalScope.produce
, not the one which sends an element to the original channel. I honestly don't see why is it awkward for hot streams
– qwwdfsad
Nov 16 at 9:55
@qwwdfsad I automatically assumed thatproduce
here is just a self-contained example and that the actual work would involve communicating coroutines.
– Marko Topolnik
Nov 16 at 10:12
add a comment |
up vote
1
down vote
favorite
up vote
1
down vote
favorite
I need to insert a buffer of 10000 elements between the various channel processors.
produce()
provides a way to configure the buffer size:
produce(capacity = 10_000) {
}
However map
, filter
default to a Rendezvous channel:
fun <E, R> ReceiveChannel<E>.map(context: CoroutineContext = Unconfined, transform: suspend (E) -> R): ReceiveChannel<R> =
produce(context) { // No capacity specified, defaults to 0
consumeEach {
send(transform(it))
}
}
Is there a way to configure this? Currently I'm constructing my own versions of these stdlib functions with a buffer, which isn't very elegant.
kotlin kotlinx.coroutines
I need to insert a buffer of 10000 elements between the various channel processors.
produce()
provides a way to configure the buffer size:
produce(capacity = 10_000) {
}
However map
, filter
default to a Rendezvous channel:
fun <E, R> ReceiveChannel<E>.map(context: CoroutineContext = Unconfined, transform: suspend (E) -> R): ReceiveChannel<R> =
produce(context) { // No capacity specified, defaults to 0
consumeEach {
send(transform(it))
}
}
Is there a way to configure this? Currently I'm constructing my own versions of these stdlib functions with a buffer, which isn't very elegant.
kotlin kotlinx.coroutines
kotlin kotlinx.coroutines
asked Nov 13 at 14:03
Kshitiz Sharma
8,2311661129
8,2311661129
If you use a dispatcher other thanUnconfined
, I wonder howmap
would even work with a rendezvous channel.
– Marko Topolnik
Nov 13 at 14:12
@MarkoTopolnik why do you think it will not work?
– qwwdfsad
Nov 16 at 9:51
@qwwdfsad Later on I realized it would, but awkwardly. The sending coroutine would remain suspended until the receiving coroutine gets and processes the item.
– Marko Topolnik
Nov 16 at 9:52
1
Just to clarify: sending coroutine isGlobalScope.produce
, not the one which sends an element to the original channel. I honestly don't see why is it awkward for hot streams
– qwwdfsad
Nov 16 at 9:55
@qwwdfsad I automatically assumed thatproduce
here is just a self-contained example and that the actual work would involve communicating coroutines.
– Marko Topolnik
Nov 16 at 10:12
add a comment |
If you use a dispatcher other thanUnconfined
, I wonder howmap
would even work with a rendezvous channel.
– Marko Topolnik
Nov 13 at 14:12
@MarkoTopolnik why do you think it will not work?
– qwwdfsad
Nov 16 at 9:51
@qwwdfsad Later on I realized it would, but awkwardly. The sending coroutine would remain suspended until the receiving coroutine gets and processes the item.
– Marko Topolnik
Nov 16 at 9:52
1
Just to clarify: sending coroutine isGlobalScope.produce
, not the one which sends an element to the original channel. I honestly don't see why is it awkward for hot streams
– qwwdfsad
Nov 16 at 9:55
@qwwdfsad I automatically assumed thatproduce
here is just a self-contained example and that the actual work would involve communicating coroutines.
– Marko Topolnik
Nov 16 at 10:12
If you use a dispatcher other than
Unconfined
, I wonder how map
would even work with a rendezvous channel.– Marko Topolnik
Nov 13 at 14:12
If you use a dispatcher other than
Unconfined
, I wonder how map
would even work with a rendezvous channel.– Marko Topolnik
Nov 13 at 14:12
@MarkoTopolnik why do you think it will not work?
– qwwdfsad
Nov 16 at 9:51
@MarkoTopolnik why do you think it will not work?
– qwwdfsad
Nov 16 at 9:51
@qwwdfsad Later on I realized it would, but awkwardly. The sending coroutine would remain suspended until the receiving coroutine gets and processes the item.
– Marko Topolnik
Nov 16 at 9:52
@qwwdfsad Later on I realized it would, but awkwardly. The sending coroutine would remain suspended until the receiving coroutine gets and processes the item.
– Marko Topolnik
Nov 16 at 9:52
1
1
Just to clarify: sending coroutine is
GlobalScope.produce
, not the one which sends an element to the original channel. I honestly don't see why is it awkward for hot streams– qwwdfsad
Nov 16 at 9:55
Just to clarify: sending coroutine is
GlobalScope.produce
, not the one which sends an element to the original channel. I honestly don't see why is it awkward for hot streams– qwwdfsad
Nov 16 at 9:55
@qwwdfsad I automatically assumed that
produce
here is just a self-contained example and that the actual work would involve communicating coroutines.– Marko Topolnik
Nov 16 at 10:12
@qwwdfsad I automatically assumed that
produce
here is just a self-contained example and that the actual work would involve communicating coroutines.– Marko Topolnik
Nov 16 at 10:12
add a comment |
1 Answer
1
active
oldest
votes
up vote
1
down vote
The only way is to provide your own map implementation with capacity
parameter:
fun <E, R> ReceiveChannel<E>.map(context: CoroutineContext = Unconfined, capacity: Int = 0, transform: suspend (E) -> R): ReceiveChannel<R> =
produce(context, capacity = capacity) {
consumeEach {
send(transform(it))
}
}
I've created a https://github.com/Kotlin/kotlinx.coroutines/issues/841, but it won't be implemented soon. All channel operators can be potentially reworked when cold streams are introduced to make cancellation and scopes hierarchies consistent between hot and cold data sources.
add a comment |
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
up vote
1
down vote
The only way is to provide your own map implementation with capacity
parameter:
fun <E, R> ReceiveChannel<E>.map(context: CoroutineContext = Unconfined, capacity: Int = 0, transform: suspend (E) -> R): ReceiveChannel<R> =
produce(context, capacity = capacity) {
consumeEach {
send(transform(it))
}
}
I've created a https://github.com/Kotlin/kotlinx.coroutines/issues/841, but it won't be implemented soon. All channel operators can be potentially reworked when cold streams are introduced to make cancellation and scopes hierarchies consistent between hot and cold data sources.
add a comment |
up vote
1
down vote
The only way is to provide your own map implementation with capacity
parameter:
fun <E, R> ReceiveChannel<E>.map(context: CoroutineContext = Unconfined, capacity: Int = 0, transform: suspend (E) -> R): ReceiveChannel<R> =
produce(context, capacity = capacity) {
consumeEach {
send(transform(it))
}
}
I've created a https://github.com/Kotlin/kotlinx.coroutines/issues/841, but it won't be implemented soon. All channel operators can be potentially reworked when cold streams are introduced to make cancellation and scopes hierarchies consistent between hot and cold data sources.
add a comment |
up vote
1
down vote
up vote
1
down vote
The only way is to provide your own map implementation with capacity
parameter:
fun <E, R> ReceiveChannel<E>.map(context: CoroutineContext = Unconfined, capacity: Int = 0, transform: suspend (E) -> R): ReceiveChannel<R> =
produce(context, capacity = capacity) {
consumeEach {
send(transform(it))
}
}
I've created a https://github.com/Kotlin/kotlinx.coroutines/issues/841, but it won't be implemented soon. All channel operators can be potentially reworked when cold streams are introduced to make cancellation and scopes hierarchies consistent between hot and cold data sources.
The only way is to provide your own map implementation with capacity
parameter:
fun <E, R> ReceiveChannel<E>.map(context: CoroutineContext = Unconfined, capacity: Int = 0, transform: suspend (E) -> R): ReceiveChannel<R> =
produce(context, capacity = capacity) {
consumeEach {
send(transform(it))
}
}
I've created a https://github.com/Kotlin/kotlinx.coroutines/issues/841, but it won't be implemented soon. All channel operators can be potentially reworked when cold streams are introduced to make cancellation and scopes hierarchies consistent between hot and cold data sources.
answered Nov 16 at 9:50
qwwdfsad
2,040820
2,040820
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Some of your past answers have not been well-received, and you're in danger of being blocked from answering.
Please pay close attention to the following guidance:
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53282761%2fkotlin-coroutines-how-to-use-array-channel-with-filter-map%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
If you use a dispatcher other than
Unconfined
, I wonder howmap
would even work with a rendezvous channel.– Marko Topolnik
Nov 13 at 14:12
@MarkoTopolnik why do you think it will not work?
– qwwdfsad
Nov 16 at 9:51
@qwwdfsad Later on I realized it would, but awkwardly. The sending coroutine would remain suspended until the receiving coroutine gets and processes the item.
– Marko Topolnik
Nov 16 at 9:52
1
Just to clarify: sending coroutine is
GlobalScope.produce
, not the one which sends an element to the original channel. I honestly don't see why is it awkward for hot streams– qwwdfsad
Nov 16 at 9:55
@qwwdfsad I automatically assumed that
produce
here is just a self-contained example and that the actual work would involve communicating coroutines.– Marko Topolnik
Nov 16 at 10:12