Kotlin coroutines: How to use Array channel with filter / map?











up vote
1
down vote

favorite












I need to insert a buffer of 10000 elements between the various channel processors.



produce() provides a way to configure the buffer size:



produce(capacity = 10_000) {
}


However map, filter default to a Rendezvous channel:



fun <E, R> ReceiveChannel<E>.map(context: CoroutineContext = Unconfined, transform: suspend (E) -> R): ReceiveChannel<R> =
produce(context) { // No capacity specified, defaults to 0
consumeEach {
send(transform(it))
}
}


Is there a way to configure this? Currently I'm constructing my own versions of these stdlib functions with a buffer, which isn't very elegant.










share|improve this question






















  • If you use a dispatcher other than Unconfined, I wonder how map would even work with a rendezvous channel.
    – Marko Topolnik
    Nov 13 at 14:12












  • @MarkoTopolnik why do you think it will not work?
    – qwwdfsad
    Nov 16 at 9:51










  • @qwwdfsad Later on I realized it would, but awkwardly. The sending coroutine would remain suspended until the receiving coroutine gets and processes the item.
    – Marko Topolnik
    Nov 16 at 9:52








  • 1




    Just to clarify: sending coroutine is GlobalScope.produce, not the one which sends an element to the original channel. I honestly don't see why is it awkward for hot streams
    – qwwdfsad
    Nov 16 at 9:55










  • @qwwdfsad I automatically assumed that produce here is just a self-contained example and that the actual work would involve communicating coroutines.
    – Marko Topolnik
    Nov 16 at 10:12















up vote
1
down vote

favorite












I need to insert a buffer of 10000 elements between the various channel processors.



produce() provides a way to configure the buffer size:



produce(capacity = 10_000) {
}


However map, filter default to a Rendezvous channel:



fun <E, R> ReceiveChannel<E>.map(context: CoroutineContext = Unconfined, transform: suspend (E) -> R): ReceiveChannel<R> =
produce(context) { // No capacity specified, defaults to 0
consumeEach {
send(transform(it))
}
}


Is there a way to configure this? Currently I'm constructing my own versions of these stdlib functions with a buffer, which isn't very elegant.










share|improve this question






















  • If you use a dispatcher other than Unconfined, I wonder how map would even work with a rendezvous channel.
    – Marko Topolnik
    Nov 13 at 14:12












  • @MarkoTopolnik why do you think it will not work?
    – qwwdfsad
    Nov 16 at 9:51










  • @qwwdfsad Later on I realized it would, but awkwardly. The sending coroutine would remain suspended until the receiving coroutine gets and processes the item.
    – Marko Topolnik
    Nov 16 at 9:52








  • 1




    Just to clarify: sending coroutine is GlobalScope.produce, not the one which sends an element to the original channel. I honestly don't see why is it awkward for hot streams
    – qwwdfsad
    Nov 16 at 9:55










  • @qwwdfsad I automatically assumed that produce here is just a self-contained example and that the actual work would involve communicating coroutines.
    – Marko Topolnik
    Nov 16 at 10:12













up vote
1
down vote

favorite









up vote
1
down vote

favorite











I need to insert a buffer of 10000 elements between the various channel processors.



produce() provides a way to configure the buffer size:



produce(capacity = 10_000) {
}


However map, filter default to a Rendezvous channel:



fun <E, R> ReceiveChannel<E>.map(context: CoroutineContext = Unconfined, transform: suspend (E) -> R): ReceiveChannel<R> =
produce(context) { // No capacity specified, defaults to 0
consumeEach {
send(transform(it))
}
}


Is there a way to configure this? Currently I'm constructing my own versions of these stdlib functions with a buffer, which isn't very elegant.










share|improve this question













I need to insert a buffer of 10000 elements between the various channel processors.



produce() provides a way to configure the buffer size:



produce(capacity = 10_000) {
}


However map, filter default to a Rendezvous channel:



fun <E, R> ReceiveChannel<E>.map(context: CoroutineContext = Unconfined, transform: suspend (E) -> R): ReceiveChannel<R> =
produce(context) { // No capacity specified, defaults to 0
consumeEach {
send(transform(it))
}
}


Is there a way to configure this? Currently I'm constructing my own versions of these stdlib functions with a buffer, which isn't very elegant.







kotlin kotlinx.coroutines






share|improve this question













share|improve this question











share|improve this question




share|improve this question










asked Nov 13 at 14:03









Kshitiz Sharma

8,2311661129




8,2311661129












  • If you use a dispatcher other than Unconfined, I wonder how map would even work with a rendezvous channel.
    – Marko Topolnik
    Nov 13 at 14:12












  • @MarkoTopolnik why do you think it will not work?
    – qwwdfsad
    Nov 16 at 9:51










  • @qwwdfsad Later on I realized it would, but awkwardly. The sending coroutine would remain suspended until the receiving coroutine gets and processes the item.
    – Marko Topolnik
    Nov 16 at 9:52








  • 1




    Just to clarify: sending coroutine is GlobalScope.produce, not the one which sends an element to the original channel. I honestly don't see why is it awkward for hot streams
    – qwwdfsad
    Nov 16 at 9:55










  • @qwwdfsad I automatically assumed that produce here is just a self-contained example and that the actual work would involve communicating coroutines.
    – Marko Topolnik
    Nov 16 at 10:12


















  • If you use a dispatcher other than Unconfined, I wonder how map would even work with a rendezvous channel.
    – Marko Topolnik
    Nov 13 at 14:12












  • @MarkoTopolnik why do you think it will not work?
    – qwwdfsad
    Nov 16 at 9:51










  • @qwwdfsad Later on I realized it would, but awkwardly. The sending coroutine would remain suspended until the receiving coroutine gets and processes the item.
    – Marko Topolnik
    Nov 16 at 9:52








  • 1




    Just to clarify: sending coroutine is GlobalScope.produce, not the one which sends an element to the original channel. I honestly don't see why is it awkward for hot streams
    – qwwdfsad
    Nov 16 at 9:55










  • @qwwdfsad I automatically assumed that produce here is just a self-contained example and that the actual work would involve communicating coroutines.
    – Marko Topolnik
    Nov 16 at 10:12
















If you use a dispatcher other than Unconfined, I wonder how map would even work with a rendezvous channel.
– Marko Topolnik
Nov 13 at 14:12






If you use a dispatcher other than Unconfined, I wonder how map would even work with a rendezvous channel.
– Marko Topolnik
Nov 13 at 14:12














@MarkoTopolnik why do you think it will not work?
– qwwdfsad
Nov 16 at 9:51




@MarkoTopolnik why do you think it will not work?
– qwwdfsad
Nov 16 at 9:51












@qwwdfsad Later on I realized it would, but awkwardly. The sending coroutine would remain suspended until the receiving coroutine gets and processes the item.
– Marko Topolnik
Nov 16 at 9:52






@qwwdfsad Later on I realized it would, but awkwardly. The sending coroutine would remain suspended until the receiving coroutine gets and processes the item.
– Marko Topolnik
Nov 16 at 9:52






1




1




Just to clarify: sending coroutine is GlobalScope.produce, not the one which sends an element to the original channel. I honestly don't see why is it awkward for hot streams
– qwwdfsad
Nov 16 at 9:55




Just to clarify: sending coroutine is GlobalScope.produce, not the one which sends an element to the original channel. I honestly don't see why is it awkward for hot streams
– qwwdfsad
Nov 16 at 9:55












@qwwdfsad I automatically assumed that produce here is just a self-contained example and that the actual work would involve communicating coroutines.
– Marko Topolnik
Nov 16 at 10:12




@qwwdfsad I automatically assumed that produce here is just a self-contained example and that the actual work would involve communicating coroutines.
– Marko Topolnik
Nov 16 at 10:12












1 Answer
1






active

oldest

votes

















up vote
1
down vote













The only way is to provide your own map implementation with capacity parameter:



fun <E, R> ReceiveChannel<E>.map(context: CoroutineContext = Unconfined, capacity: Int = 0, transform: suspend (E) -> R): ReceiveChannel<R> =
produce(context, capacity = capacity) {
consumeEach {
send(transform(it))
}
}


I've created a https://github.com/Kotlin/kotlinx.coroutines/issues/841, but it won't be implemented soon. All channel operators can be potentially reworked when cold streams are introduced to make cancellation and scopes hierarchies consistent between hot and cold data sources.






share|improve this answer





















    Your Answer






    StackExchange.ifUsing("editor", function () {
    StackExchange.using("externalEditor", function () {
    StackExchange.using("snippets", function () {
    StackExchange.snippets.init();
    });
    });
    }, "code-snippets");

    StackExchange.ready(function() {
    var channelOptions = {
    tags: "".split(" "),
    id: "1"
    };
    initTagRenderer("".split(" "), "".split(" "), channelOptions);

    StackExchange.using("externalEditor", function() {
    // Have to fire editor after snippets, if snippets enabled
    if (StackExchange.settings.snippets.snippetsEnabled) {
    StackExchange.using("snippets", function() {
    createEditor();
    });
    }
    else {
    createEditor();
    }
    });

    function createEditor() {
    StackExchange.prepareEditor({
    heartbeatType: 'answer',
    convertImagesToLinks: true,
    noModals: true,
    showLowRepImageUploadWarning: true,
    reputationToPostImages: 10,
    bindNavPrevention: true,
    postfix: "",
    imageUploader: {
    brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
    contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
    allowUrls: true
    },
    onDemand: true,
    discardSelector: ".discard-answer"
    ,immediatelyShowMarkdownHelp:true
    });


    }
    });














    draft saved

    draft discarded


















    StackExchange.ready(
    function () {
    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53282761%2fkotlin-coroutines-how-to-use-array-channel-with-filter-map%23new-answer', 'question_page');
    }
    );

    Post as a guest















    Required, but never shown

























    1 Answer
    1






    active

    oldest

    votes








    1 Answer
    1






    active

    oldest

    votes









    active

    oldest

    votes






    active

    oldest

    votes








    up vote
    1
    down vote













    The only way is to provide your own map implementation with capacity parameter:



    fun <E, R> ReceiveChannel<E>.map(context: CoroutineContext = Unconfined, capacity: Int = 0, transform: suspend (E) -> R): ReceiveChannel<R> =
    produce(context, capacity = capacity) {
    consumeEach {
    send(transform(it))
    }
    }


    I've created a https://github.com/Kotlin/kotlinx.coroutines/issues/841, but it won't be implemented soon. All channel operators can be potentially reworked when cold streams are introduced to make cancellation and scopes hierarchies consistent between hot and cold data sources.






    share|improve this answer

























      up vote
      1
      down vote













      The only way is to provide your own map implementation with capacity parameter:



      fun <E, R> ReceiveChannel<E>.map(context: CoroutineContext = Unconfined, capacity: Int = 0, transform: suspend (E) -> R): ReceiveChannel<R> =
      produce(context, capacity = capacity) {
      consumeEach {
      send(transform(it))
      }
      }


      I've created a https://github.com/Kotlin/kotlinx.coroutines/issues/841, but it won't be implemented soon. All channel operators can be potentially reworked when cold streams are introduced to make cancellation and scopes hierarchies consistent between hot and cold data sources.






      share|improve this answer























        up vote
        1
        down vote










        up vote
        1
        down vote









        The only way is to provide your own map implementation with capacity parameter:



        fun <E, R> ReceiveChannel<E>.map(context: CoroutineContext = Unconfined, capacity: Int = 0, transform: suspend (E) -> R): ReceiveChannel<R> =
        produce(context, capacity = capacity) {
        consumeEach {
        send(transform(it))
        }
        }


        I've created a https://github.com/Kotlin/kotlinx.coroutines/issues/841, but it won't be implemented soon. All channel operators can be potentially reworked when cold streams are introduced to make cancellation and scopes hierarchies consistent between hot and cold data sources.






        share|improve this answer












        The only way is to provide your own map implementation with capacity parameter:



        fun <E, R> ReceiveChannel<E>.map(context: CoroutineContext = Unconfined, capacity: Int = 0, transform: suspend (E) -> R): ReceiveChannel<R> =
        produce(context, capacity = capacity) {
        consumeEach {
        send(transform(it))
        }
        }


        I've created a https://github.com/Kotlin/kotlinx.coroutines/issues/841, but it won't be implemented soon. All channel operators can be potentially reworked when cold streams are introduced to make cancellation and scopes hierarchies consistent between hot and cold data sources.







        share|improve this answer












        share|improve this answer



        share|improve this answer










        answered Nov 16 at 9:50









        qwwdfsad

        2,040820




        2,040820






























            draft saved

            draft discarded




















































            Thanks for contributing an answer to Stack Overflow!


            • Please be sure to answer the question. Provide details and share your research!

            But avoid



            • Asking for help, clarification, or responding to other answers.

            • Making statements based on opinion; back them up with references or personal experience.


            To learn more, see our tips on writing great answers.





            Some of your past answers have not been well-received, and you're in danger of being blocked from answering.


            Please pay close attention to the following guidance:


            • Please be sure to answer the question. Provide details and share your research!

            But avoid



            • Asking for help, clarification, or responding to other answers.

            • Making statements based on opinion; back them up with references or personal experience.


            To learn more, see our tips on writing great answers.




            draft saved


            draft discarded














            StackExchange.ready(
            function () {
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53282761%2fkotlin-coroutines-how-to-use-array-channel-with-filter-map%23new-answer', 'question_page');
            }
            );

            Post as a guest















            Required, but never shown





















































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown

































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown







            Popular posts from this blog

            How to change which sound is reproduced for terminal bell?

            Title Spacing in Bjornstrup Chapter, Removing Chapter Number From Contents

            Can I use Tabulator js library in my java Spring + Thymeleaf project?