KafkaConsumer resume partition cannot continue to receive uncommitted messages











up vote
0
down vote

favorite












I'm using one topic, one partition, one consumer, Kafka client version is 0.10.



I got two different results:




  1. If I paused partition first, then to produce a message and to invoke resume method. KafkaConsumer can poll the uncommitted message successfully.


  2. But If I produced message first and didn't commit its offset, then to pause the partition, after several seconds, to invoke the resume method. KafkaConsumer would not receive the uncommitted message. I checked it on Kafka server using kafka-consumer-groups.sh, it shows LOG-END-OFFSET minus CURRENT-OFFSET = LAG = 1.



I have been trying to figure out it for two days, I repeated such tests a lot of times, the results are always like so. I need some suggestion or someone can tell me its Kafka's original mechanism.










share|improve this question






















  • 0.10 is now pretty old, have you tried with a more recent version?
    – Mickael Maison
    20 hours ago















up vote
0
down vote

favorite












I'm using one topic, one partition, one consumer, Kafka client version is 0.10.



I got two different results:




  1. If I paused partition first, then to produce a message and to invoke resume method. KafkaConsumer can poll the uncommitted message successfully.


  2. But If I produced message first and didn't commit its offset, then to pause the partition, after several seconds, to invoke the resume method. KafkaConsumer would not receive the uncommitted message. I checked it on Kafka server using kafka-consumer-groups.sh, it shows LOG-END-OFFSET minus CURRENT-OFFSET = LAG = 1.



I have been trying to figure out it for two days, I repeated such tests a lot of times, the results are always like so. I need some suggestion or someone can tell me its Kafka's original mechanism.










share|improve this question






















  • 0.10 is now pretty old, have you tried with a more recent version?
    – Mickael Maison
    20 hours ago













up vote
0
down vote

favorite









up vote
0
down vote

favorite











I'm using one topic, one partition, one consumer, Kafka client version is 0.10.



I got two different results:




  1. If I paused partition first, then to produce a message and to invoke resume method. KafkaConsumer can poll the uncommitted message successfully.


  2. But If I produced message first and didn't commit its offset, then to pause the partition, after several seconds, to invoke the resume method. KafkaConsumer would not receive the uncommitted message. I checked it on Kafka server using kafka-consumer-groups.sh, it shows LOG-END-OFFSET minus CURRENT-OFFSET = LAG = 1.



I have been trying to figure out it for two days, I repeated such tests a lot of times, the results are always like so. I need some suggestion or someone can tell me its Kafka's original mechanism.










share|improve this question













I'm using one topic, one partition, one consumer, Kafka client version is 0.10.



I got two different results:




  1. If I paused partition first, then to produce a message and to invoke resume method. KafkaConsumer can poll the uncommitted message successfully.


  2. But If I produced message first and didn't commit its offset, then to pause the partition, after several seconds, to invoke the resume method. KafkaConsumer would not receive the uncommitted message. I checked it on Kafka server using kafka-consumer-groups.sh, it shows LOG-END-OFFSET minus CURRENT-OFFSET = LAG = 1.



I have been trying to figure out it for two days, I repeated such tests a lot of times, the results are always like so. I need some suggestion or someone can tell me its Kafka's original mechanism.







apache-kafka kafka-consumer-api






share|improve this question













share|improve this question











share|improve this question




share|improve this question










asked 22 hours ago









Rick

5611




5611












  • 0.10 is now pretty old, have you tried with a more recent version?
    – Mickael Maison
    20 hours ago


















  • 0.10 is now pretty old, have you tried with a more recent version?
    – Mickael Maison
    20 hours ago
















0.10 is now pretty old, have you tried with a more recent version?
– Mickael Maison
20 hours ago




0.10 is now pretty old, have you tried with a more recent version?
– Mickael Maison
20 hours ago












1 Answer
1






active

oldest

votes

















up vote
1
down vote



accepted










For your observation#2, if you restart the application, it will supply you all records from the un-committed offset, i.e. the missing record and if your consumer again does not commit, it will be sent again when application registers consumer with Kafka upon restart. It is expected.



Assuming you are using consumer.poll() which creates a hybrid-streaming interface i.e. if accumulates data coming into Kafka for the duration mentioned and provides it to the consumer for processing once the duration is finished. This continuous accumulation happens in the backend and is not dependent on whether you have committed offset or not.



KafkaConsumer




The position of the consumer gives the offset of the next record that
will be given out. It will be one larger than the highest offset the
consumer has seen in that partition. It automatically advances every
time the consumer receives messages in a call to poll(long).







share|improve this answer





















  • Thank you, it explains clear. That’s my big blind spot. It seems that I can handle #2 by using seek method of KafkaConsumer.
    – Rick
    11 hours ago











Your Answer






StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");

StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});

function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});


}
});














 

draft saved


draft discarded


















StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53265928%2fkafkaconsumer-resume-partition-cannot-continue-to-receive-uncommitted-messages%23new-answer', 'question_page');
}
);

Post as a guest
































1 Answer
1






active

oldest

votes








1 Answer
1






active

oldest

votes









active

oldest

votes






active

oldest

votes








up vote
1
down vote



accepted










For your observation#2, if you restart the application, it will supply you all records from the un-committed offset, i.e. the missing record and if your consumer again does not commit, it will be sent again when application registers consumer with Kafka upon restart. It is expected.



Assuming you are using consumer.poll() which creates a hybrid-streaming interface i.e. if accumulates data coming into Kafka for the duration mentioned and provides it to the consumer for processing once the duration is finished. This continuous accumulation happens in the backend and is not dependent on whether you have committed offset or not.



KafkaConsumer




The position of the consumer gives the offset of the next record that
will be given out. It will be one larger than the highest offset the
consumer has seen in that partition. It automatically advances every
time the consumer receives messages in a call to poll(long).







share|improve this answer





















  • Thank you, it explains clear. That’s my big blind spot. It seems that I can handle #2 by using seek method of KafkaConsumer.
    – Rick
    11 hours ago















up vote
1
down vote



accepted










For your observation#2, if you restart the application, it will supply you all records from the un-committed offset, i.e. the missing record and if your consumer again does not commit, it will be sent again when application registers consumer with Kafka upon restart. It is expected.



Assuming you are using consumer.poll() which creates a hybrid-streaming interface i.e. if accumulates data coming into Kafka for the duration mentioned and provides it to the consumer for processing once the duration is finished. This continuous accumulation happens in the backend and is not dependent on whether you have committed offset or not.



KafkaConsumer




The position of the consumer gives the offset of the next record that
will be given out. It will be one larger than the highest offset the
consumer has seen in that partition. It automatically advances every
time the consumer receives messages in a call to poll(long).







share|improve this answer





















  • Thank you, it explains clear. That’s my big blind spot. It seems that I can handle #2 by using seek method of KafkaConsumer.
    – Rick
    11 hours ago













up vote
1
down vote



accepted







up vote
1
down vote



accepted






For your observation#2, if you restart the application, it will supply you all records from the un-committed offset, i.e. the missing record and if your consumer again does not commit, it will be sent again when application registers consumer with Kafka upon restart. It is expected.



Assuming you are using consumer.poll() which creates a hybrid-streaming interface i.e. if accumulates data coming into Kafka for the duration mentioned and provides it to the consumer for processing once the duration is finished. This continuous accumulation happens in the backend and is not dependent on whether you have committed offset or not.



KafkaConsumer




The position of the consumer gives the offset of the next record that
will be given out. It will be one larger than the highest offset the
consumer has seen in that partition. It automatically advances every
time the consumer receives messages in a call to poll(long).







share|improve this answer












For your observation#2, if you restart the application, it will supply you all records from the un-committed offset, i.e. the missing record and if your consumer again does not commit, it will be sent again when application registers consumer with Kafka upon restart. It is expected.



Assuming you are using consumer.poll() which creates a hybrid-streaming interface i.e. if accumulates data coming into Kafka for the duration mentioned and provides it to the consumer for processing once the duration is finished. This continuous accumulation happens in the backend and is not dependent on whether you have committed offset or not.



KafkaConsumer




The position of the consumer gives the offset of the next record that
will be given out. It will be one larger than the highest offset the
consumer has seen in that partition. It automatically advances every
time the consumer receives messages in a call to poll(long).








share|improve this answer












share|improve this answer



share|improve this answer










answered 14 hours ago









AbhishekN

22616




22616












  • Thank you, it explains clear. That’s my big blind spot. It seems that I can handle #2 by using seek method of KafkaConsumer.
    – Rick
    11 hours ago


















  • Thank you, it explains clear. That’s my big blind spot. It seems that I can handle #2 by using seek method of KafkaConsumer.
    – Rick
    11 hours ago
















Thank you, it explains clear. That’s my big blind spot. It seems that I can handle #2 by using seek method of KafkaConsumer.
– Rick
11 hours ago




Thank you, it explains clear. That’s my big blind spot. It seems that I can handle #2 by using seek method of KafkaConsumer.
– Rick
11 hours ago


















 

draft saved


draft discarded



















































 


draft saved


draft discarded














StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53265928%2fkafkaconsumer-resume-partition-cannot-continue-to-receive-uncommitted-messages%23new-answer', 'question_page');
}
);

Post as a guest




















































































Popular posts from this blog

How to change which sound is reproduced for terminal bell?

Can I use Tabulator js library in my java Spring + Thymeleaf project?

Title Spacing in Bjornstrup Chapter, Removing Chapter Number From Contents