Proper way to assign watermark with DateStreamSource<List> using Flink












0















I have a continuing JSONArray data produced to Kafka topic,and I wanna process records with EventTime characteristic.In order to reach this goal,I have to assign watermark to each record which contained in the JSONArray.



I didn't find a convenience way to achieve this goal.My solution is consuming data from DataStreamSource> ,then iterate List and collect Object to downstream with an anonymous ProcessFunction,finally assign watermark to the this downstream.



The major code shows below:




DataStreamSource<List<MockData>> listDataStreamSource = KafkaSource.genStream(env);
SingleOutputStreamOperator<MockData> convertToPojo = listDataStreamSource
.process(new ProcessFunction<List<MockData>, MockData>() {
@Override
public void processElement(List<MockData> value, Context ctx, Collector<MockData> out)
throws Exception {
value.forEach(mockData -> out.collect(mockData));
}
});
convertToPojo.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<MockData>(Time.seconds(5)) {
@Override
public long extractTimestamp(MockData element) {
return element.getTimestamp();
}
});
SingleOutputStreamOperator<Tuple2<String, Long>> countStream = convertToPojo
.keyBy("country").window(
SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(10)))
.process(
new FlinkEventTimeCountFunction()).name("count elements");



The code seems all right without doubt,running without error as well.But ProcessWindowFunction never triggered.I tracked the Flink source code,find EventTimeTrigger never returns TriggerResult.FIRE,causing by TriggerContext.getCurrentWatermark returns Long.MIN_VALUE all the time.



What's the proper way to process List in eventtime?Any suggestion will be appreciated.










share|improve this question



























    0















    I have a continuing JSONArray data produced to Kafka topic,and I wanna process records with EventTime characteristic.In order to reach this goal,I have to assign watermark to each record which contained in the JSONArray.



    I didn't find a convenience way to achieve this goal.My solution is consuming data from DataStreamSource> ,then iterate List and collect Object to downstream with an anonymous ProcessFunction,finally assign watermark to the this downstream.



    The major code shows below:




    DataStreamSource<List<MockData>> listDataStreamSource = KafkaSource.genStream(env);
    SingleOutputStreamOperator<MockData> convertToPojo = listDataStreamSource
    .process(new ProcessFunction<List<MockData>, MockData>() {
    @Override
    public void processElement(List<MockData> value, Context ctx, Collector<MockData> out)
    throws Exception {
    value.forEach(mockData -> out.collect(mockData));
    }
    });
    convertToPojo.assignTimestampsAndWatermarks(
    new BoundedOutOfOrdernessTimestampExtractor<MockData>(Time.seconds(5)) {
    @Override
    public long extractTimestamp(MockData element) {
    return element.getTimestamp();
    }
    });
    SingleOutputStreamOperator<Tuple2<String, Long>> countStream = convertToPojo
    .keyBy("country").window(
    SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(10)))
    .process(
    new FlinkEventTimeCountFunction()).name("count elements");



    The code seems all right without doubt,running without error as well.But ProcessWindowFunction never triggered.I tracked the Flink source code,find EventTimeTrigger never returns TriggerResult.FIRE,causing by TriggerContext.getCurrentWatermark returns Long.MIN_VALUE all the time.



    What's the proper way to process List in eventtime?Any suggestion will be appreciated.










    share|improve this question

























      0












      0








      0








      I have a continuing JSONArray data produced to Kafka topic,and I wanna process records with EventTime characteristic.In order to reach this goal,I have to assign watermark to each record which contained in the JSONArray.



      I didn't find a convenience way to achieve this goal.My solution is consuming data from DataStreamSource> ,then iterate List and collect Object to downstream with an anonymous ProcessFunction,finally assign watermark to the this downstream.



      The major code shows below:




      DataStreamSource<List<MockData>> listDataStreamSource = KafkaSource.genStream(env);
      SingleOutputStreamOperator<MockData> convertToPojo = listDataStreamSource
      .process(new ProcessFunction<List<MockData>, MockData>() {
      @Override
      public void processElement(List<MockData> value, Context ctx, Collector<MockData> out)
      throws Exception {
      value.forEach(mockData -> out.collect(mockData));
      }
      });
      convertToPojo.assignTimestampsAndWatermarks(
      new BoundedOutOfOrdernessTimestampExtractor<MockData>(Time.seconds(5)) {
      @Override
      public long extractTimestamp(MockData element) {
      return element.getTimestamp();
      }
      });
      SingleOutputStreamOperator<Tuple2<String, Long>> countStream = convertToPojo
      .keyBy("country").window(
      SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(10)))
      .process(
      new FlinkEventTimeCountFunction()).name("count elements");



      The code seems all right without doubt,running without error as well.But ProcessWindowFunction never triggered.I tracked the Flink source code,find EventTimeTrigger never returns TriggerResult.FIRE,causing by TriggerContext.getCurrentWatermark returns Long.MIN_VALUE all the time.



      What's the proper way to process List in eventtime?Any suggestion will be appreciated.










      share|improve this question














      I have a continuing JSONArray data produced to Kafka topic,and I wanna process records with EventTime characteristic.In order to reach this goal,I have to assign watermark to each record which contained in the JSONArray.



      I didn't find a convenience way to achieve this goal.My solution is consuming data from DataStreamSource> ,then iterate List and collect Object to downstream with an anonymous ProcessFunction,finally assign watermark to the this downstream.



      The major code shows below:




      DataStreamSource<List<MockData>> listDataStreamSource = KafkaSource.genStream(env);
      SingleOutputStreamOperator<MockData> convertToPojo = listDataStreamSource
      .process(new ProcessFunction<List<MockData>, MockData>() {
      @Override
      public void processElement(List<MockData> value, Context ctx, Collector<MockData> out)
      throws Exception {
      value.forEach(mockData -> out.collect(mockData));
      }
      });
      convertToPojo.assignTimestampsAndWatermarks(
      new BoundedOutOfOrdernessTimestampExtractor<MockData>(Time.seconds(5)) {
      @Override
      public long extractTimestamp(MockData element) {
      return element.getTimestamp();
      }
      });
      SingleOutputStreamOperator<Tuple2<String, Long>> countStream = convertToPojo
      .keyBy("country").window(
      SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(10)))
      .process(
      new FlinkEventTimeCountFunction()).name("count elements");



      The code seems all right without doubt,running without error as well.But ProcessWindowFunction never triggered.I tracked the Flink source code,find EventTimeTrigger never returns TriggerResult.FIRE,causing by TriggerContext.getCurrentWatermark returns Long.MIN_VALUE all the time.



      What's the proper way to process List in eventtime?Any suggestion will be appreciated.







      apache-flink flink-streaming






      share|improve this question













      share|improve this question











      share|improve this question




      share|improve this question










      asked Nov 22 '18 at 6:32









      moyigukemoyiguke

      275




      275
























          1 Answer
          1






          active

          oldest

          votes


















          0














          The problem is that you are applying the keyBy and window operations to the convertToPojo stream, rather than the stream with timestamps and watermarks (which you didn't assign to a variable).



          If you write the code more or less like this, it should work:



          listDataStreamSource = KafkaSource ...
          convertToPojo = listDataStreamSource.process ...
          pojoPlusWatermarks = convertToPojo.assignTimestampsAndWatermarks ...
          countStream = pojoPlusWatermarks.keyBy ...


          Calling assignTimestampsAndWatermarks on the convertToPojo stream does not modify that stream, but rather creates a new datastream object that includes timestamps and watermarks. You need to apply your windowing to that new datastream.






          share|improve this answer


























          • Very glad to see you,and thanks for you prompt reply .I don't quite understand that,is it means user can not assign watermark to down stream other than source stream.As I find SourceFunction has collectWithTimestamp but none in Collector.

            – moyiguke
            Nov 23 '18 at 9:51













          • I've expanded my answer to hopefully be more clear.

            – David Anderson
            Nov 23 '18 at 10:03











          • Thank you so much !!! Your precise explanation enlightened me,following your guide and pseudocode,it works well now.

            – moyiguke
            Nov 24 '18 at 14:24












          Your Answer






          StackExchange.ifUsing("editor", function () {
          StackExchange.using("externalEditor", function () {
          StackExchange.using("snippets", function () {
          StackExchange.snippets.init();
          });
          });
          }, "code-snippets");

          StackExchange.ready(function() {
          var channelOptions = {
          tags: "".split(" "),
          id: "1"
          };
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function() {
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled) {
          StackExchange.using("snippets", function() {
          createEditor();
          });
          }
          else {
          createEditor();
          }
          });

          function createEditor() {
          StackExchange.prepareEditor({
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader: {
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          },
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          });


          }
          });














          draft saved

          draft discarded


















          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53425086%2fproper-way-to-assign-watermark-with-datestreamsourcelistt-using-flink%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown

























          1 Answer
          1






          active

          oldest

          votes








          1 Answer
          1






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes









          0














          The problem is that you are applying the keyBy and window operations to the convertToPojo stream, rather than the stream with timestamps and watermarks (which you didn't assign to a variable).



          If you write the code more or less like this, it should work:



          listDataStreamSource = KafkaSource ...
          convertToPojo = listDataStreamSource.process ...
          pojoPlusWatermarks = convertToPojo.assignTimestampsAndWatermarks ...
          countStream = pojoPlusWatermarks.keyBy ...


          Calling assignTimestampsAndWatermarks on the convertToPojo stream does not modify that stream, but rather creates a new datastream object that includes timestamps and watermarks. You need to apply your windowing to that new datastream.






          share|improve this answer


























          • Very glad to see you,and thanks for you prompt reply .I don't quite understand that,is it means user can not assign watermark to down stream other than source stream.As I find SourceFunction has collectWithTimestamp but none in Collector.

            – moyiguke
            Nov 23 '18 at 9:51













          • I've expanded my answer to hopefully be more clear.

            – David Anderson
            Nov 23 '18 at 10:03











          • Thank you so much !!! Your precise explanation enlightened me,following your guide and pseudocode,it works well now.

            – moyiguke
            Nov 24 '18 at 14:24
















          0














          The problem is that you are applying the keyBy and window operations to the convertToPojo stream, rather than the stream with timestamps and watermarks (which you didn't assign to a variable).



          If you write the code more or less like this, it should work:



          listDataStreamSource = KafkaSource ...
          convertToPojo = listDataStreamSource.process ...
          pojoPlusWatermarks = convertToPojo.assignTimestampsAndWatermarks ...
          countStream = pojoPlusWatermarks.keyBy ...


          Calling assignTimestampsAndWatermarks on the convertToPojo stream does not modify that stream, but rather creates a new datastream object that includes timestamps and watermarks. You need to apply your windowing to that new datastream.






          share|improve this answer


























          • Very glad to see you,and thanks for you prompt reply .I don't quite understand that,is it means user can not assign watermark to down stream other than source stream.As I find SourceFunction has collectWithTimestamp but none in Collector.

            – moyiguke
            Nov 23 '18 at 9:51













          • I've expanded my answer to hopefully be more clear.

            – David Anderson
            Nov 23 '18 at 10:03











          • Thank you so much !!! Your precise explanation enlightened me,following your guide and pseudocode,it works well now.

            – moyiguke
            Nov 24 '18 at 14:24














          0












          0








          0







          The problem is that you are applying the keyBy and window operations to the convertToPojo stream, rather than the stream with timestamps and watermarks (which you didn't assign to a variable).



          If you write the code more or less like this, it should work:



          listDataStreamSource = KafkaSource ...
          convertToPojo = listDataStreamSource.process ...
          pojoPlusWatermarks = convertToPojo.assignTimestampsAndWatermarks ...
          countStream = pojoPlusWatermarks.keyBy ...


          Calling assignTimestampsAndWatermarks on the convertToPojo stream does not modify that stream, but rather creates a new datastream object that includes timestamps and watermarks. You need to apply your windowing to that new datastream.






          share|improve this answer















          The problem is that you are applying the keyBy and window operations to the convertToPojo stream, rather than the stream with timestamps and watermarks (which you didn't assign to a variable).



          If you write the code more or less like this, it should work:



          listDataStreamSource = KafkaSource ...
          convertToPojo = listDataStreamSource.process ...
          pojoPlusWatermarks = convertToPojo.assignTimestampsAndWatermarks ...
          countStream = pojoPlusWatermarks.keyBy ...


          Calling assignTimestampsAndWatermarks on the convertToPojo stream does not modify that stream, but rather creates a new datastream object that includes timestamps and watermarks. You need to apply your windowing to that new datastream.







          share|improve this answer














          share|improve this answer



          share|improve this answer








          edited Nov 23 '18 at 10:03

























          answered Nov 22 '18 at 8:35









          David AndersonDavid Anderson

          6,63921424




          6,63921424













          • Very glad to see you,and thanks for you prompt reply .I don't quite understand that,is it means user can not assign watermark to down stream other than source stream.As I find SourceFunction has collectWithTimestamp but none in Collector.

            – moyiguke
            Nov 23 '18 at 9:51













          • I've expanded my answer to hopefully be more clear.

            – David Anderson
            Nov 23 '18 at 10:03











          • Thank you so much !!! Your precise explanation enlightened me,following your guide and pseudocode,it works well now.

            – moyiguke
            Nov 24 '18 at 14:24



















          • Very glad to see you,and thanks for you prompt reply .I don't quite understand that,is it means user can not assign watermark to down stream other than source stream.As I find SourceFunction has collectWithTimestamp but none in Collector.

            – moyiguke
            Nov 23 '18 at 9:51













          • I've expanded my answer to hopefully be more clear.

            – David Anderson
            Nov 23 '18 at 10:03











          • Thank you so much !!! Your precise explanation enlightened me,following your guide and pseudocode,it works well now.

            – moyiguke
            Nov 24 '18 at 14:24

















          Very glad to see you,and thanks for you prompt reply .I don't quite understand that,is it means user can not assign watermark to down stream other than source stream.As I find SourceFunction has collectWithTimestamp but none in Collector.

          – moyiguke
          Nov 23 '18 at 9:51







          Very glad to see you,and thanks for you prompt reply .I don't quite understand that,is it means user can not assign watermark to down stream other than source stream.As I find SourceFunction has collectWithTimestamp but none in Collector.

          – moyiguke
          Nov 23 '18 at 9:51















          I've expanded my answer to hopefully be more clear.

          – David Anderson
          Nov 23 '18 at 10:03





          I've expanded my answer to hopefully be more clear.

          – David Anderson
          Nov 23 '18 at 10:03













          Thank you so much !!! Your precise explanation enlightened me,following your guide and pseudocode,it works well now.

          – moyiguke
          Nov 24 '18 at 14:24





          Thank you so much !!! Your precise explanation enlightened me,following your guide and pseudocode,it works well now.

          – moyiguke
          Nov 24 '18 at 14:24




















          draft saved

          draft discarded




















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53425086%2fproper-way-to-assign-watermark-with-datestreamsourcelistt-using-flink%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          How to change which sound is reproduced for terminal bell?

          Can I use Tabulator js library in my java Spring + Thymeleaf project?

          Title Spacing in Bjornstrup Chapter, Removing Chapter Number From Contents