KafkaConsumer resume partition cannot continue to receive uncommitted messages
up vote
0
down vote
favorite
I'm using one topic, one partition, one consumer, Kafka client version is 0.10.
I got two different results:
If I paused partition first, then to produce a message and to invoke resume method. KafkaConsumer can poll the uncommitted message successfully.
But If I produced message first and didn't commit its offset, then to pause the partition, after several seconds, to invoke the resume method. KafkaConsumer would not receive the uncommitted message. I checked it on Kafka server using
kafka-consumer-groups.sh
, it showsLOG-END-OFFSET minus CURRENT-OFFSET = LAG = 1
.
I have been trying to figure out it for two days, I repeated such tests a lot of times, the results are always like so. I need some suggestion or someone can tell me its Kafka's original mechanism.
apache-kafka kafka-consumer-api
add a comment |
up vote
0
down vote
favorite
I'm using one topic, one partition, one consumer, Kafka client version is 0.10.
I got two different results:
If I paused partition first, then to produce a message and to invoke resume method. KafkaConsumer can poll the uncommitted message successfully.
But If I produced message first and didn't commit its offset, then to pause the partition, after several seconds, to invoke the resume method. KafkaConsumer would not receive the uncommitted message. I checked it on Kafka server using
kafka-consumer-groups.sh
, it showsLOG-END-OFFSET minus CURRENT-OFFSET = LAG = 1
.
I have been trying to figure out it for two days, I repeated such tests a lot of times, the results are always like so. I need some suggestion or someone can tell me its Kafka's original mechanism.
apache-kafka kafka-consumer-api
0.10 is now pretty old, have you tried with a more recent version?
– Mickael Maison
20 hours ago
add a comment |
up vote
0
down vote
favorite
up vote
0
down vote
favorite
I'm using one topic, one partition, one consumer, Kafka client version is 0.10.
I got two different results:
If I paused partition first, then to produce a message and to invoke resume method. KafkaConsumer can poll the uncommitted message successfully.
But If I produced message first and didn't commit its offset, then to pause the partition, after several seconds, to invoke the resume method. KafkaConsumer would not receive the uncommitted message. I checked it on Kafka server using
kafka-consumer-groups.sh
, it showsLOG-END-OFFSET minus CURRENT-OFFSET = LAG = 1
.
I have been trying to figure out it for two days, I repeated such tests a lot of times, the results are always like so. I need some suggestion or someone can tell me its Kafka's original mechanism.
apache-kafka kafka-consumer-api
I'm using one topic, one partition, one consumer, Kafka client version is 0.10.
I got two different results:
If I paused partition first, then to produce a message and to invoke resume method. KafkaConsumer can poll the uncommitted message successfully.
But If I produced message first and didn't commit its offset, then to pause the partition, after several seconds, to invoke the resume method. KafkaConsumer would not receive the uncommitted message. I checked it on Kafka server using
kafka-consumer-groups.sh
, it showsLOG-END-OFFSET minus CURRENT-OFFSET = LAG = 1
.
I have been trying to figure out it for two days, I repeated such tests a lot of times, the results are always like so. I need some suggestion or someone can tell me its Kafka's original mechanism.
apache-kafka kafka-consumer-api
apache-kafka kafka-consumer-api
asked 22 hours ago
Rick
5611
5611
0.10 is now pretty old, have you tried with a more recent version?
– Mickael Maison
20 hours ago
add a comment |
0.10 is now pretty old, have you tried with a more recent version?
– Mickael Maison
20 hours ago
0.10 is now pretty old, have you tried with a more recent version?
– Mickael Maison
20 hours ago
0.10 is now pretty old, have you tried with a more recent version?
– Mickael Maison
20 hours ago
add a comment |
1 Answer
1
active
oldest
votes
up vote
1
down vote
accepted
For your observation#2, if you restart the application, it will supply you all records from the un-committed offset, i.e. the missing record and if your consumer again does not commit, it will be sent again when application registers consumer with Kafka upon restart. It is expected.
Assuming you are using consumer.poll()
which creates a hybrid-streaming interface i.e. if accumulates data coming into Kafka for the duration
mentioned and provides it to the consumer for processing once the duration is finished. This continuous accumulation happens in the backend and is not dependent on whether you have committed offset or not.
KafkaConsumer
The position of the consumer gives the offset of the next record that
will be given out. It will be one larger than the highest offset the
consumer has seen in that partition. It automatically advances every
time the consumer receives messages in a call to poll(long).
Thank you, it explains clear. That’s my big blind spot. It seems that I can handle #2 by using seek method of KafkaConsumer.
– Rick
11 hours ago
add a comment |
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
up vote
1
down vote
accepted
For your observation#2, if you restart the application, it will supply you all records from the un-committed offset, i.e. the missing record and if your consumer again does not commit, it will be sent again when application registers consumer with Kafka upon restart. It is expected.
Assuming you are using consumer.poll()
which creates a hybrid-streaming interface i.e. if accumulates data coming into Kafka for the duration
mentioned and provides it to the consumer for processing once the duration is finished. This continuous accumulation happens in the backend and is not dependent on whether you have committed offset or not.
KafkaConsumer
The position of the consumer gives the offset of the next record that
will be given out. It will be one larger than the highest offset the
consumer has seen in that partition. It automatically advances every
time the consumer receives messages in a call to poll(long).
Thank you, it explains clear. That’s my big blind spot. It seems that I can handle #2 by using seek method of KafkaConsumer.
– Rick
11 hours ago
add a comment |
up vote
1
down vote
accepted
For your observation#2, if you restart the application, it will supply you all records from the un-committed offset, i.e. the missing record and if your consumer again does not commit, it will be sent again when application registers consumer with Kafka upon restart. It is expected.
Assuming you are using consumer.poll()
which creates a hybrid-streaming interface i.e. if accumulates data coming into Kafka for the duration
mentioned and provides it to the consumer for processing once the duration is finished. This continuous accumulation happens in the backend and is not dependent on whether you have committed offset or not.
KafkaConsumer
The position of the consumer gives the offset of the next record that
will be given out. It will be one larger than the highest offset the
consumer has seen in that partition. It automatically advances every
time the consumer receives messages in a call to poll(long).
Thank you, it explains clear. That’s my big blind spot. It seems that I can handle #2 by using seek method of KafkaConsumer.
– Rick
11 hours ago
add a comment |
up vote
1
down vote
accepted
up vote
1
down vote
accepted
For your observation#2, if you restart the application, it will supply you all records from the un-committed offset, i.e. the missing record and if your consumer again does not commit, it will be sent again when application registers consumer with Kafka upon restart. It is expected.
Assuming you are using consumer.poll()
which creates a hybrid-streaming interface i.e. if accumulates data coming into Kafka for the duration
mentioned and provides it to the consumer for processing once the duration is finished. This continuous accumulation happens in the backend and is not dependent on whether you have committed offset or not.
KafkaConsumer
The position of the consumer gives the offset of the next record that
will be given out. It will be one larger than the highest offset the
consumer has seen in that partition. It automatically advances every
time the consumer receives messages in a call to poll(long).
For your observation#2, if you restart the application, it will supply you all records from the un-committed offset, i.e. the missing record and if your consumer again does not commit, it will be sent again when application registers consumer with Kafka upon restart. It is expected.
Assuming you are using consumer.poll()
which creates a hybrid-streaming interface i.e. if accumulates data coming into Kafka for the duration
mentioned and provides it to the consumer for processing once the duration is finished. This continuous accumulation happens in the backend and is not dependent on whether you have committed offset or not.
KafkaConsumer
The position of the consumer gives the offset of the next record that
will be given out. It will be one larger than the highest offset the
consumer has seen in that partition. It automatically advances every
time the consumer receives messages in a call to poll(long).
answered 14 hours ago
AbhishekN
22616
22616
Thank you, it explains clear. That’s my big blind spot. It seems that I can handle #2 by using seek method of KafkaConsumer.
– Rick
11 hours ago
add a comment |
Thank you, it explains clear. That’s my big blind spot. It seems that I can handle #2 by using seek method of KafkaConsumer.
– Rick
11 hours ago
Thank you, it explains clear. That’s my big blind spot. It seems that I can handle #2 by using seek method of KafkaConsumer.
– Rick
11 hours ago
Thank you, it explains clear. That’s my big blind spot. It seems that I can handle #2 by using seek method of KafkaConsumer.
– Rick
11 hours ago
add a comment |
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53265928%2fkafkaconsumer-resume-partition-cannot-continue-to-receive-uncommitted-messages%23new-answer', 'question_page');
}
);
Post as a guest
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
0.10 is now pretty old, have you tried with a more recent version?
– Mickael Maison
20 hours ago