Spring Cloud Stream Reactive - How to do the Error Handling in case of reactive stream pipeline?





.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty{ height:90px;width:728px;box-sizing:border-box;
}







1















How to do the error handling for the reactive stream pipeline. Like




  • Application Error Handling (ex: errorChannel)

  • System Error Handling (working with DLQ, reprocessing etc)


The current documentation only describes error handling for the non-reactive pipeline.
https://docs.spring.io/spring-cloud-stream/docs/Fishtown.BUILD-SNAPSHOT/reference/htmlsingle/#_application_error_handling




Spring cloud stream has done pretty easy for the users by providing simple configs for the error handling scenarios. It would be great if the same error handling usecases (with same configs) work for the reactive stream pipeline as well. The use cases and the respective config details as below:





  • @StreamListner("errorChannel") annotation for the global error handling

  • @KafkaListener(id="bar", topics = "reactive-stream-error-topic")

  • Configs for the DLQ and producing failed messages to the error topics
    spring.cloud.stream.kafka.bindings.input.consumer.enableDlq=true
    spring.cloud.stream.kafka.bindings.input.consumer.dlqName=reactive-stream-error-topic



The example from the documentation works fine with spring-cloud-stream but the same thing gives an error for reactive pipeline. Any guidelines in this direction would be of great help for the community. Thanks in advance!



@SpringBootApplication
@EnableBinding(Sink.class)
public class ReactiveStreamSinkApplication {

public static void main(String args) {
SpringApplication.run(ReactiveStreamSinkApplication.class, args);
}

@StreamListener
public void receive(@Input(Sink.INPUT) Flux<String> inputFlux) {
inputFlux.subscribe(System.out::println);
throw new RuntimeException("BOOM!");
}

@StreamListener("errorChannel")
public void error(Message<?> message) {
// log the error msg
System.out.println("Handling ERROR: " + message);
}

@KafkaListener(id="bar", topics = "reactive-stream-error-topic")
public void error(String in) {
System.out.println(in + " from DLQ");
}
}









share|improve this question































    1















    How to do the error handling for the reactive stream pipeline. Like




    • Application Error Handling (ex: errorChannel)

    • System Error Handling (working with DLQ, reprocessing etc)


    The current documentation only describes error handling for the non-reactive pipeline.
    https://docs.spring.io/spring-cloud-stream/docs/Fishtown.BUILD-SNAPSHOT/reference/htmlsingle/#_application_error_handling




    Spring cloud stream has done pretty easy for the users by providing simple configs for the error handling scenarios. It would be great if the same error handling usecases (with same configs) work for the reactive stream pipeline as well. The use cases and the respective config details as below:





    • @StreamListner("errorChannel") annotation for the global error handling

    • @KafkaListener(id="bar", topics = "reactive-stream-error-topic")

    • Configs for the DLQ and producing failed messages to the error topics
      spring.cloud.stream.kafka.bindings.input.consumer.enableDlq=true
      spring.cloud.stream.kafka.bindings.input.consumer.dlqName=reactive-stream-error-topic



    The example from the documentation works fine with spring-cloud-stream but the same thing gives an error for reactive pipeline. Any guidelines in this direction would be of great help for the community. Thanks in advance!



    @SpringBootApplication
    @EnableBinding(Sink.class)
    public class ReactiveStreamSinkApplication {

    public static void main(String args) {
    SpringApplication.run(ReactiveStreamSinkApplication.class, args);
    }

    @StreamListener
    public void receive(@Input(Sink.INPUT) Flux<String> inputFlux) {
    inputFlux.subscribe(System.out::println);
    throw new RuntimeException("BOOM!");
    }

    @StreamListener("errorChannel")
    public void error(Message<?> message) {
    // log the error msg
    System.out.println("Handling ERROR: " + message);
    }

    @KafkaListener(id="bar", topics = "reactive-stream-error-topic")
    public void error(String in) {
    System.out.println(in + " from DLQ");
    }
    }









    share|improve this question



























      1












      1








      1








      How to do the error handling for the reactive stream pipeline. Like




      • Application Error Handling (ex: errorChannel)

      • System Error Handling (working with DLQ, reprocessing etc)


      The current documentation only describes error handling for the non-reactive pipeline.
      https://docs.spring.io/spring-cloud-stream/docs/Fishtown.BUILD-SNAPSHOT/reference/htmlsingle/#_application_error_handling




      Spring cloud stream has done pretty easy for the users by providing simple configs for the error handling scenarios. It would be great if the same error handling usecases (with same configs) work for the reactive stream pipeline as well. The use cases and the respective config details as below:





      • @StreamListner("errorChannel") annotation for the global error handling

      • @KafkaListener(id="bar", topics = "reactive-stream-error-topic")

      • Configs for the DLQ and producing failed messages to the error topics
        spring.cloud.stream.kafka.bindings.input.consumer.enableDlq=true
        spring.cloud.stream.kafka.bindings.input.consumer.dlqName=reactive-stream-error-topic



      The example from the documentation works fine with spring-cloud-stream but the same thing gives an error for reactive pipeline. Any guidelines in this direction would be of great help for the community. Thanks in advance!



      @SpringBootApplication
      @EnableBinding(Sink.class)
      public class ReactiveStreamSinkApplication {

      public static void main(String args) {
      SpringApplication.run(ReactiveStreamSinkApplication.class, args);
      }

      @StreamListener
      public void receive(@Input(Sink.INPUT) Flux<String> inputFlux) {
      inputFlux.subscribe(System.out::println);
      throw new RuntimeException("BOOM!");
      }

      @StreamListener("errorChannel")
      public void error(Message<?> message) {
      // log the error msg
      System.out.println("Handling ERROR: " + message);
      }

      @KafkaListener(id="bar", topics = "reactive-stream-error-topic")
      public void error(String in) {
      System.out.println(in + " from DLQ");
      }
      }









      share|improve this question
















      How to do the error handling for the reactive stream pipeline. Like




      • Application Error Handling (ex: errorChannel)

      • System Error Handling (working with DLQ, reprocessing etc)


      The current documentation only describes error handling for the non-reactive pipeline.
      https://docs.spring.io/spring-cloud-stream/docs/Fishtown.BUILD-SNAPSHOT/reference/htmlsingle/#_application_error_handling




      Spring cloud stream has done pretty easy for the users by providing simple configs for the error handling scenarios. It would be great if the same error handling usecases (with same configs) work for the reactive stream pipeline as well. The use cases and the respective config details as below:





      • @StreamListner("errorChannel") annotation for the global error handling

      • @KafkaListener(id="bar", topics = "reactive-stream-error-topic")

      • Configs for the DLQ and producing failed messages to the error topics
        spring.cloud.stream.kafka.bindings.input.consumer.enableDlq=true
        spring.cloud.stream.kafka.bindings.input.consumer.dlqName=reactive-stream-error-topic



      The example from the documentation works fine with spring-cloud-stream but the same thing gives an error for reactive pipeline. Any guidelines in this direction would be of great help for the community. Thanks in advance!



      @SpringBootApplication
      @EnableBinding(Sink.class)
      public class ReactiveStreamSinkApplication {

      public static void main(String args) {
      SpringApplication.run(ReactiveStreamSinkApplication.class, args);
      }

      @StreamListener
      public void receive(@Input(Sink.INPUT) Flux<String> inputFlux) {
      inputFlux.subscribe(System.out::println);
      throw new RuntimeException("BOOM!");
      }

      @StreamListener("errorChannel")
      public void error(Message<?> message) {
      // log the error msg
      System.out.println("Handling ERROR: " + message);
      }

      @KafkaListener(id="bar", topics = "reactive-stream-error-topic")
      public void error(String in) {
      System.out.println(in + " from DLQ");
      }
      }






      spring-cloud-stream spring-kafka






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Nov 28 '18 at 10:04







      Mansingh Shitole

















      asked Nov 22 '18 at 21:34









      Mansingh ShitoleMansingh Shitole

      315




      315
























          1 Answer
          1






          active

          oldest

          votes


















          1














          Sorry for the late reply.



          First, you have a problem in your code and a way you are treating how exception may be thrown from the reactive stream. Basically you are dealing with a declarative handler which is treated quite different. In your code the receive method will be invoked only once during the startup and initialization. So throwing exception from it is not at all equal to the exception thrown during the stream processing, which is the error handling mechanism you are inquiring about was designed for. But. . .



          That aside. . .



          With introduction of Spring Cloud Function programming model we are considering shifting attention from reactive module all together since Spring Cloud Function already provides support for reactive programming model.
          So consider the following:



          @SpringBootApplication
          @EnableBinding(Sink.class)
          public class ReactiveStreamSinkApplication {

          public static void main(String args) {
          SpringApplication.run(ReactiveStreamSinkApplication.class,
          "--spring.cloud.stream.function.definition=myconsumer");
          }

          @Bean
          public Consumer<Flux<String>> myconsumer() {
          return stream -> stream.subscribe(value -> {
          if ("foo".equals(value)) {
          throw new RuntimeException("BOOM!");
          }
          System.out.println("Received value: " + value);
          });
          }

          @StreamListener("errorChannel")
          public void error(Message<?> message) {
          // log the error msg
          System.out.println("Handling ERROR: " + message);
          }
          }


          Try that and let us know.






          share|improve this answer
























          • Excited to try out the Spring Cloud Function in the pipeline. I will try your suggestion and keep you updated on the test results. Thank you!

            – Mansingh Shitole
            Dec 5 '18 at 12:42











          • It works partially. I can see the error message in my @StreamListener("errorChannel") method. But the failed message don't go to the DLQ ie error-topic. The same configurations work fine in the non-reactive stream application.

            – Mansingh Shitole
            Dec 6 '18 at 16:07











          • In addition, there's new error popped up, The exception track is this java.lang.ClassCastException: ReactiveStreamSinkApplication$$Lambda cannot be cast to java.util.function.Function This is due to the return type of SCF Consumer<Flux<String>>. It worked when I changed the return type to without Flux i.e. Consumer<String>. This seems a big problem. Since we would like to have Flux to do more declarative style programming inside myconsumer() method. Could you please check on this. Thank you!

            – Mansingh Shitole
            Dec 6 '18 at 16:20






          • 1





            @MansinghShitole please see github.com/spring-cloud/spring-cloud-stream/issues/1539 as the issue was fixed. Give it a try and let us know.

            – Oleg Zhurakousky
            Dec 10 '18 at 17:04











          • Thanks for the update. It works! No more class cast exception and I can use Consumer<Flux<String>>. But unfortunately, error handling not working again due to this fix. Before (without Flux), after the RuntimeException thrown - the error used to propagate back to the errorChannel. Now the Consumer Flux just gets cancelled and now the flow stops their only. I think its bug or we need to find out another way to handle this. Colud you please check on this. Thank you.

            – Mansingh Shitole
            Dec 14 '18 at 15:02














          Your Answer






          StackExchange.ifUsing("editor", function () {
          StackExchange.using("externalEditor", function () {
          StackExchange.using("snippets", function () {
          StackExchange.snippets.init();
          });
          });
          }, "code-snippets");

          StackExchange.ready(function() {
          var channelOptions = {
          tags: "".split(" "),
          id: "1"
          };
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function() {
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled) {
          StackExchange.using("snippets", function() {
          createEditor();
          });
          }
          else {
          createEditor();
          }
          });

          function createEditor() {
          StackExchange.prepareEditor({
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader: {
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          },
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          });


          }
          });














          draft saved

          draft discarded


















          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53438208%2fspring-cloud-stream-reactive-how-to-do-the-error-handling-in-case-of-reactive%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown

























          1 Answer
          1






          active

          oldest

          votes








          1 Answer
          1






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes









          1














          Sorry for the late reply.



          First, you have a problem in your code and a way you are treating how exception may be thrown from the reactive stream. Basically you are dealing with a declarative handler which is treated quite different. In your code the receive method will be invoked only once during the startup and initialization. So throwing exception from it is not at all equal to the exception thrown during the stream processing, which is the error handling mechanism you are inquiring about was designed for. But. . .



          That aside. . .



          With introduction of Spring Cloud Function programming model we are considering shifting attention from reactive module all together since Spring Cloud Function already provides support for reactive programming model.
          So consider the following:



          @SpringBootApplication
          @EnableBinding(Sink.class)
          public class ReactiveStreamSinkApplication {

          public static void main(String args) {
          SpringApplication.run(ReactiveStreamSinkApplication.class,
          "--spring.cloud.stream.function.definition=myconsumer");
          }

          @Bean
          public Consumer<Flux<String>> myconsumer() {
          return stream -> stream.subscribe(value -> {
          if ("foo".equals(value)) {
          throw new RuntimeException("BOOM!");
          }
          System.out.println("Received value: " + value);
          });
          }

          @StreamListener("errorChannel")
          public void error(Message<?> message) {
          // log the error msg
          System.out.println("Handling ERROR: " + message);
          }
          }


          Try that and let us know.






          share|improve this answer
























          • Excited to try out the Spring Cloud Function in the pipeline. I will try your suggestion and keep you updated on the test results. Thank you!

            – Mansingh Shitole
            Dec 5 '18 at 12:42











          • It works partially. I can see the error message in my @StreamListener("errorChannel") method. But the failed message don't go to the DLQ ie error-topic. The same configurations work fine in the non-reactive stream application.

            – Mansingh Shitole
            Dec 6 '18 at 16:07











          • In addition, there's new error popped up, The exception track is this java.lang.ClassCastException: ReactiveStreamSinkApplication$$Lambda cannot be cast to java.util.function.Function This is due to the return type of SCF Consumer<Flux<String>>. It worked when I changed the return type to without Flux i.e. Consumer<String>. This seems a big problem. Since we would like to have Flux to do more declarative style programming inside myconsumer() method. Could you please check on this. Thank you!

            – Mansingh Shitole
            Dec 6 '18 at 16:20






          • 1





            @MansinghShitole please see github.com/spring-cloud/spring-cloud-stream/issues/1539 as the issue was fixed. Give it a try and let us know.

            – Oleg Zhurakousky
            Dec 10 '18 at 17:04











          • Thanks for the update. It works! No more class cast exception and I can use Consumer<Flux<String>>. But unfortunately, error handling not working again due to this fix. Before (without Flux), after the RuntimeException thrown - the error used to propagate back to the errorChannel. Now the Consumer Flux just gets cancelled and now the flow stops their only. I think its bug or we need to find out another way to handle this. Colud you please check on this. Thank you.

            – Mansingh Shitole
            Dec 14 '18 at 15:02


















          1














          Sorry for the late reply.



          First, you have a problem in your code and a way you are treating how exception may be thrown from the reactive stream. Basically you are dealing with a declarative handler which is treated quite different. In your code the receive method will be invoked only once during the startup and initialization. So throwing exception from it is not at all equal to the exception thrown during the stream processing, which is the error handling mechanism you are inquiring about was designed for. But. . .



          That aside. . .



          With introduction of Spring Cloud Function programming model we are considering shifting attention from reactive module all together since Spring Cloud Function already provides support for reactive programming model.
          So consider the following:



          @SpringBootApplication
          @EnableBinding(Sink.class)
          public class ReactiveStreamSinkApplication {

          public static void main(String args) {
          SpringApplication.run(ReactiveStreamSinkApplication.class,
          "--spring.cloud.stream.function.definition=myconsumer");
          }

          @Bean
          public Consumer<Flux<String>> myconsumer() {
          return stream -> stream.subscribe(value -> {
          if ("foo".equals(value)) {
          throw new RuntimeException("BOOM!");
          }
          System.out.println("Received value: " + value);
          });
          }

          @StreamListener("errorChannel")
          public void error(Message<?> message) {
          // log the error msg
          System.out.println("Handling ERROR: " + message);
          }
          }


          Try that and let us know.






          share|improve this answer
























          • Excited to try out the Spring Cloud Function in the pipeline. I will try your suggestion and keep you updated on the test results. Thank you!

            – Mansingh Shitole
            Dec 5 '18 at 12:42











          • It works partially. I can see the error message in my @StreamListener("errorChannel") method. But the failed message don't go to the DLQ ie error-topic. The same configurations work fine in the non-reactive stream application.

            – Mansingh Shitole
            Dec 6 '18 at 16:07











          • In addition, there's new error popped up, The exception track is this java.lang.ClassCastException: ReactiveStreamSinkApplication$$Lambda cannot be cast to java.util.function.Function This is due to the return type of SCF Consumer<Flux<String>>. It worked when I changed the return type to without Flux i.e. Consumer<String>. This seems a big problem. Since we would like to have Flux to do more declarative style programming inside myconsumer() method. Could you please check on this. Thank you!

            – Mansingh Shitole
            Dec 6 '18 at 16:20






          • 1





            @MansinghShitole please see github.com/spring-cloud/spring-cloud-stream/issues/1539 as the issue was fixed. Give it a try and let us know.

            – Oleg Zhurakousky
            Dec 10 '18 at 17:04











          • Thanks for the update. It works! No more class cast exception and I can use Consumer<Flux<String>>. But unfortunately, error handling not working again due to this fix. Before (without Flux), after the RuntimeException thrown - the error used to propagate back to the errorChannel. Now the Consumer Flux just gets cancelled and now the flow stops their only. I think its bug or we need to find out another way to handle this. Colud you please check on this. Thank you.

            – Mansingh Shitole
            Dec 14 '18 at 15:02
















          1












          1








          1







          Sorry for the late reply.



          First, you have a problem in your code and a way you are treating how exception may be thrown from the reactive stream. Basically you are dealing with a declarative handler which is treated quite different. In your code the receive method will be invoked only once during the startup and initialization. So throwing exception from it is not at all equal to the exception thrown during the stream processing, which is the error handling mechanism you are inquiring about was designed for. But. . .



          That aside. . .



          With introduction of Spring Cloud Function programming model we are considering shifting attention from reactive module all together since Spring Cloud Function already provides support for reactive programming model.
          So consider the following:



          @SpringBootApplication
          @EnableBinding(Sink.class)
          public class ReactiveStreamSinkApplication {

          public static void main(String args) {
          SpringApplication.run(ReactiveStreamSinkApplication.class,
          "--spring.cloud.stream.function.definition=myconsumer");
          }

          @Bean
          public Consumer<Flux<String>> myconsumer() {
          return stream -> stream.subscribe(value -> {
          if ("foo".equals(value)) {
          throw new RuntimeException("BOOM!");
          }
          System.out.println("Received value: " + value);
          });
          }

          @StreamListener("errorChannel")
          public void error(Message<?> message) {
          // log the error msg
          System.out.println("Handling ERROR: " + message);
          }
          }


          Try that and let us know.






          share|improve this answer













          Sorry for the late reply.



          First, you have a problem in your code and a way you are treating how exception may be thrown from the reactive stream. Basically you are dealing with a declarative handler which is treated quite different. In your code the receive method will be invoked only once during the startup and initialization. So throwing exception from it is not at all equal to the exception thrown during the stream processing, which is the error handling mechanism you are inquiring about was designed for. But. . .



          That aside. . .



          With introduction of Spring Cloud Function programming model we are considering shifting attention from reactive module all together since Spring Cloud Function already provides support for reactive programming model.
          So consider the following:



          @SpringBootApplication
          @EnableBinding(Sink.class)
          public class ReactiveStreamSinkApplication {

          public static void main(String args) {
          SpringApplication.run(ReactiveStreamSinkApplication.class,
          "--spring.cloud.stream.function.definition=myconsumer");
          }

          @Bean
          public Consumer<Flux<String>> myconsumer() {
          return stream -> stream.subscribe(value -> {
          if ("foo".equals(value)) {
          throw new RuntimeException("BOOM!");
          }
          System.out.println("Received value: " + value);
          });
          }

          @StreamListener("errorChannel")
          public void error(Message<?> message) {
          // log the error msg
          System.out.println("Handling ERROR: " + message);
          }
          }


          Try that and let us know.







          share|improve this answer












          share|improve this answer



          share|improve this answer










          answered Nov 29 '18 at 13:29









          Oleg ZhurakouskyOleg Zhurakousky

          2,03479




          2,03479













          • Excited to try out the Spring Cloud Function in the pipeline. I will try your suggestion and keep you updated on the test results. Thank you!

            – Mansingh Shitole
            Dec 5 '18 at 12:42











          • It works partially. I can see the error message in my @StreamListener("errorChannel") method. But the failed message don't go to the DLQ ie error-topic. The same configurations work fine in the non-reactive stream application.

            – Mansingh Shitole
            Dec 6 '18 at 16:07











          • In addition, there's new error popped up, The exception track is this java.lang.ClassCastException: ReactiveStreamSinkApplication$$Lambda cannot be cast to java.util.function.Function This is due to the return type of SCF Consumer<Flux<String>>. It worked when I changed the return type to without Flux i.e. Consumer<String>. This seems a big problem. Since we would like to have Flux to do more declarative style programming inside myconsumer() method. Could you please check on this. Thank you!

            – Mansingh Shitole
            Dec 6 '18 at 16:20






          • 1





            @MansinghShitole please see github.com/spring-cloud/spring-cloud-stream/issues/1539 as the issue was fixed. Give it a try and let us know.

            – Oleg Zhurakousky
            Dec 10 '18 at 17:04











          • Thanks for the update. It works! No more class cast exception and I can use Consumer<Flux<String>>. But unfortunately, error handling not working again due to this fix. Before (without Flux), after the RuntimeException thrown - the error used to propagate back to the errorChannel. Now the Consumer Flux just gets cancelled and now the flow stops their only. I think its bug or we need to find out another way to handle this. Colud you please check on this. Thank you.

            – Mansingh Shitole
            Dec 14 '18 at 15:02





















          • Excited to try out the Spring Cloud Function in the pipeline. I will try your suggestion and keep you updated on the test results. Thank you!

            – Mansingh Shitole
            Dec 5 '18 at 12:42











          • It works partially. I can see the error message in my @StreamListener("errorChannel") method. But the failed message don't go to the DLQ ie error-topic. The same configurations work fine in the non-reactive stream application.

            – Mansingh Shitole
            Dec 6 '18 at 16:07











          • In addition, there's new error popped up, The exception track is this java.lang.ClassCastException: ReactiveStreamSinkApplication$$Lambda cannot be cast to java.util.function.Function This is due to the return type of SCF Consumer<Flux<String>>. It worked when I changed the return type to without Flux i.e. Consumer<String>. This seems a big problem. Since we would like to have Flux to do more declarative style programming inside myconsumer() method. Could you please check on this. Thank you!

            – Mansingh Shitole
            Dec 6 '18 at 16:20






          • 1





            @MansinghShitole please see github.com/spring-cloud/spring-cloud-stream/issues/1539 as the issue was fixed. Give it a try and let us know.

            – Oleg Zhurakousky
            Dec 10 '18 at 17:04











          • Thanks for the update. It works! No more class cast exception and I can use Consumer<Flux<String>>. But unfortunately, error handling not working again due to this fix. Before (without Flux), after the RuntimeException thrown - the error used to propagate back to the errorChannel. Now the Consumer Flux just gets cancelled and now the flow stops their only. I think its bug or we need to find out another way to handle this. Colud you please check on this. Thank you.

            – Mansingh Shitole
            Dec 14 '18 at 15:02



















          Excited to try out the Spring Cloud Function in the pipeline. I will try your suggestion and keep you updated on the test results. Thank you!

          – Mansingh Shitole
          Dec 5 '18 at 12:42





          Excited to try out the Spring Cloud Function in the pipeline. I will try your suggestion and keep you updated on the test results. Thank you!

          – Mansingh Shitole
          Dec 5 '18 at 12:42













          It works partially. I can see the error message in my @StreamListener("errorChannel") method. But the failed message don't go to the DLQ ie error-topic. The same configurations work fine in the non-reactive stream application.

          – Mansingh Shitole
          Dec 6 '18 at 16:07





          It works partially. I can see the error message in my @StreamListener("errorChannel") method. But the failed message don't go to the DLQ ie error-topic. The same configurations work fine in the non-reactive stream application.

          – Mansingh Shitole
          Dec 6 '18 at 16:07













          In addition, there's new error popped up, The exception track is this java.lang.ClassCastException: ReactiveStreamSinkApplication$$Lambda cannot be cast to java.util.function.Function This is due to the return type of SCF Consumer<Flux<String>>. It worked when I changed the return type to without Flux i.e. Consumer<String>. This seems a big problem. Since we would like to have Flux to do more declarative style programming inside myconsumer() method. Could you please check on this. Thank you!

          – Mansingh Shitole
          Dec 6 '18 at 16:20





          In addition, there's new error popped up, The exception track is this java.lang.ClassCastException: ReactiveStreamSinkApplication$$Lambda cannot be cast to java.util.function.Function This is due to the return type of SCF Consumer<Flux<String>>. It worked when I changed the return type to without Flux i.e. Consumer<String>. This seems a big problem. Since we would like to have Flux to do more declarative style programming inside myconsumer() method. Could you please check on this. Thank you!

          – Mansingh Shitole
          Dec 6 '18 at 16:20




          1




          1





          @MansinghShitole please see github.com/spring-cloud/spring-cloud-stream/issues/1539 as the issue was fixed. Give it a try and let us know.

          – Oleg Zhurakousky
          Dec 10 '18 at 17:04





          @MansinghShitole please see github.com/spring-cloud/spring-cloud-stream/issues/1539 as the issue was fixed. Give it a try and let us know.

          – Oleg Zhurakousky
          Dec 10 '18 at 17:04













          Thanks for the update. It works! No more class cast exception and I can use Consumer<Flux<String>>. But unfortunately, error handling not working again due to this fix. Before (without Flux), after the RuntimeException thrown - the error used to propagate back to the errorChannel. Now the Consumer Flux just gets cancelled and now the flow stops their only. I think its bug or we need to find out another way to handle this. Colud you please check on this. Thank you.

          – Mansingh Shitole
          Dec 14 '18 at 15:02







          Thanks for the update. It works! No more class cast exception and I can use Consumer<Flux<String>>. But unfortunately, error handling not working again due to this fix. Before (without Flux), after the RuntimeException thrown - the error used to propagate back to the errorChannel. Now the Consumer Flux just gets cancelled and now the flow stops their only. I think its bug or we need to find out another way to handle this. Colud you please check on this. Thank you.

          – Mansingh Shitole
          Dec 14 '18 at 15:02






















          draft saved

          draft discarded




















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53438208%2fspring-cloud-stream-reactive-how-to-do-the-error-handling-in-case-of-reactive%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          How to change which sound is reproduced for terminal bell?

          Can I use Tabulator js library in my java Spring + Thymeleaf project?

          Title Spacing in Bjornstrup Chapter, Removing Chapter Number From Contents