How to create kafka compacted topic
I have a Kafka
application that has a producer
who produces messages to a topic. A consumer
then takes the messages from the topic, does some logic to the given messages and then produces them to another topic.
I'm using ProducerRecord
and ConsumerRecords
.
I want my app to create 2 compacted topics
and then use them. If the compacted topics
already exist, just display a message and continue.
My SimpleProducer
class:
package com.kafkatest.demo;
import java.util.*;
import org.apache.kafka.clients.producer.*;
public class SimpleProducer extends Thread{
public static void main(String args) throws Exception{
String topicName = "nodesTopic";
String key = "Key1";
String value = "Value-1";
String key1 = "Key2";
String value1 = "Value-2";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092,localhost:9093");
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer <>(props);
ProducerRecord<String, String> record = new ProducerRecord<>(topicName,key,value);
producer.send(record);
ProducerRecord<String, String> record2 = new ProducerRecord<>(topicName,key1,value1);
producer.send(record2);
ProducerRecord<String, String> record3 = new ProducerRecord<>(topicName,key,value);
producer.send(record3);
ProducerRecord<String, String> record4 = new ProducerRecord<>(topicName,key,value);
producer.send(record4);
ProducerRecord<String, String> record5 = new ProducerRecord<>(topicName,key,value);
producer.send(record5);
ProducerRecord<String, String> record6 = new ProducerRecord<>(topicName,key,value);
producer.send(record6);
producer.close();
System.out.println("SimpleProducer Completed.");
}
}
My SimpleConsumer class:
package com.kafkatest.demo;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class SimpleConsumer extends Thread{
public static void main(String args) {
Properties props1 = new Properties();
props1.put("bootstrap.servers", "localhost:9092,localhost:9093");
props1.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props1.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer <>(props1);
Duration duration = Duration.of(2, ChronoUnit.MINUTES);
String topicName = "nodesTopic";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "consumer-tutorial");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topicName));
try {
while (true) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
consumer.beginningOffsets(consumer.assignment());
ConsumerRecords<String, String> records = consumer.poll(duration);
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.offset() + ": " + record.value());
System.out.println("Record: " + record.value().toLowerCase());
ProducerRecord<String, String> record1 = new ProducerRecord<>("forecastTopic", "Key", record.offset() + ". " + record.value().toLowerCase());
String a = "" + records.count();
ProducerRecord<String, String> record2 = new ProducerRecord<>("forecastTopic", "Key", record.offset() + ". " + a);
producer.send(record1);
producer.send(record2);
}
}
} finally {
producer.close();
consumer.close();
}
}
}
When I run bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic forecastTopic --from-beginning
and I run my producer a couple of time, I get
0. value-1
0. 6
1. value-2
1. 6
2. value-1
2. 6
3. value-1
3. 6
4. value-1
4. 6
5. value-1
5. 6
6. value-1
6. 6
7. value-2
7. 6
8. value-1
8. 6
9. value-1
9. 6
10. value-1
10. 6
11. value-1
11. 6
12. value-1
12. 6
13. value-2
13. 6
14. value-1
14. 6
15. value-1
15. 6
16. value-1
16. 6
17. value-1
17. 6
18. value-1
18. 6
19. value-2
19. 6
20. value-1
20. 6
21. value-1
21. 6
22. value-1
22. 6
23. value-1
23. 6
24. value-1
24. 6
25. value-2
25. 6
26. value-1
26. 6
27. value-1
27. 6
28. value-1
28. 6
29. value-1
29. 6
30. value-1
30. 6
31. value-2
31. 6
32. value-1
32. 6
33. value-1
33. 6
34. value-1
34. 6
35. value-1
35. 6
36. value-1
36. 6
37. value-2
37. 6
38. value-1
38. 6
39. value-1
39. 6
40. value-1
40. 6
41. value-1
41. 6
42. value-1
42. 6
43. value-2
43. 6
44. value-1
44. 6
45. value-1
45. 6
46. value-1
46. 6
47. value-1
47. 6
48. value-1
48. 12
49. value-2
49. 12
50. value-1
50. 12
51. value-1
51. 12
52. value-1
52. 12
53. value-1
53. 12
54. value-1
54. 12
55. value-2
55. 12
56. value-1
56. 12
57. value-1
57. 12
58. value-1
58. 12
59. value-1
59. 12
60. value-1
60. 6
61. value-2
61. 6
62. value-1
62. 6
63. value-1
63. 6
64. value-1
64. 6
65. value-1
65. 6
66. value-1
66. 6
67. value-2
67. 6
68. value-1
68. 6
69. value-1
69. 6
70. value-1
70. 6
71. value-1
71. 6
72. value-1
72. 6
73. value-2
73. 6
74. value-1
74. 6
75. value-1
75. 6
76. value-1
76. 6
77. value-1
77. 6
78. value-1
78. 6
79. value-2
79. 6
80. value-1
80. 6
81. value-1
81. 6
82. value-1
82. 6
83. value-1
83. 6
I put the log.cleanup.policy=compact
in the server.properties
file, but it doesn't seem to work, because I have all the 83 offsets in the topic.
Thank you.
java apache-kafka
|
show 4 more comments
I have a Kafka
application that has a producer
who produces messages to a topic. A consumer
then takes the messages from the topic, does some logic to the given messages and then produces them to another topic.
I'm using ProducerRecord
and ConsumerRecords
.
I want my app to create 2 compacted topics
and then use them. If the compacted topics
already exist, just display a message and continue.
My SimpleProducer
class:
package com.kafkatest.demo;
import java.util.*;
import org.apache.kafka.clients.producer.*;
public class SimpleProducer extends Thread{
public static void main(String args) throws Exception{
String topicName = "nodesTopic";
String key = "Key1";
String value = "Value-1";
String key1 = "Key2";
String value1 = "Value-2";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092,localhost:9093");
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer <>(props);
ProducerRecord<String, String> record = new ProducerRecord<>(topicName,key,value);
producer.send(record);
ProducerRecord<String, String> record2 = new ProducerRecord<>(topicName,key1,value1);
producer.send(record2);
ProducerRecord<String, String> record3 = new ProducerRecord<>(topicName,key,value);
producer.send(record3);
ProducerRecord<String, String> record4 = new ProducerRecord<>(topicName,key,value);
producer.send(record4);
ProducerRecord<String, String> record5 = new ProducerRecord<>(topicName,key,value);
producer.send(record5);
ProducerRecord<String, String> record6 = new ProducerRecord<>(topicName,key,value);
producer.send(record6);
producer.close();
System.out.println("SimpleProducer Completed.");
}
}
My SimpleConsumer class:
package com.kafkatest.demo;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class SimpleConsumer extends Thread{
public static void main(String args) {
Properties props1 = new Properties();
props1.put("bootstrap.servers", "localhost:9092,localhost:9093");
props1.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props1.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer <>(props1);
Duration duration = Duration.of(2, ChronoUnit.MINUTES);
String topicName = "nodesTopic";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "consumer-tutorial");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topicName));
try {
while (true) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
consumer.beginningOffsets(consumer.assignment());
ConsumerRecords<String, String> records = consumer.poll(duration);
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.offset() + ": " + record.value());
System.out.println("Record: " + record.value().toLowerCase());
ProducerRecord<String, String> record1 = new ProducerRecord<>("forecastTopic", "Key", record.offset() + ". " + record.value().toLowerCase());
String a = "" + records.count();
ProducerRecord<String, String> record2 = new ProducerRecord<>("forecastTopic", "Key", record.offset() + ". " + a);
producer.send(record1);
producer.send(record2);
}
}
} finally {
producer.close();
consumer.close();
}
}
}
When I run bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic forecastTopic --from-beginning
and I run my producer a couple of time, I get
0. value-1
0. 6
1. value-2
1. 6
2. value-1
2. 6
3. value-1
3. 6
4. value-1
4. 6
5. value-1
5. 6
6. value-1
6. 6
7. value-2
7. 6
8. value-1
8. 6
9. value-1
9. 6
10. value-1
10. 6
11. value-1
11. 6
12. value-1
12. 6
13. value-2
13. 6
14. value-1
14. 6
15. value-1
15. 6
16. value-1
16. 6
17. value-1
17. 6
18. value-1
18. 6
19. value-2
19. 6
20. value-1
20. 6
21. value-1
21. 6
22. value-1
22. 6
23. value-1
23. 6
24. value-1
24. 6
25. value-2
25. 6
26. value-1
26. 6
27. value-1
27. 6
28. value-1
28. 6
29. value-1
29. 6
30. value-1
30. 6
31. value-2
31. 6
32. value-1
32. 6
33. value-1
33. 6
34. value-1
34. 6
35. value-1
35. 6
36. value-1
36. 6
37. value-2
37. 6
38. value-1
38. 6
39. value-1
39. 6
40. value-1
40. 6
41. value-1
41. 6
42. value-1
42. 6
43. value-2
43. 6
44. value-1
44. 6
45. value-1
45. 6
46. value-1
46. 6
47. value-1
47. 6
48. value-1
48. 12
49. value-2
49. 12
50. value-1
50. 12
51. value-1
51. 12
52. value-1
52. 12
53. value-1
53. 12
54. value-1
54. 12
55. value-2
55. 12
56. value-1
56. 12
57. value-1
57. 12
58. value-1
58. 12
59. value-1
59. 12
60. value-1
60. 6
61. value-2
61. 6
62. value-1
62. 6
63. value-1
63. 6
64. value-1
64. 6
65. value-1
65. 6
66. value-1
66. 6
67. value-2
67. 6
68. value-1
68. 6
69. value-1
69. 6
70. value-1
70. 6
71. value-1
71. 6
72. value-1
72. 6
73. value-2
73. 6
74. value-1
74. 6
75. value-1
75. 6
76. value-1
76. 6
77. value-1
77. 6
78. value-1
78. 6
79. value-2
79. 6
80. value-1
80. 6
81. value-1
81. 6
82. value-1
82. 6
83. value-1
83. 6
I put the log.cleanup.policy=compact
in the server.properties
file, but it doesn't seem to work, because I have all the 83 offsets in the topic.
Thank you.
java apache-kafka
What did you try so far?
– Giorgos Myrianthous
Nov 20 '18 at 10:50
I foundlog.cleanup.policy=compact
, but I don't know where to put it. Is it somewhere in theKafka
config ?
– Milan Panic
Nov 20 '18 at 10:53
1
You should place it under/etc/kafka/server.properties
.
– Giorgos Myrianthous
Nov 20 '18 at 10:56
Does it matter where I place it in this file?
– Milan Panic
Nov 20 '18 at 11:00
No, it doesn't matter.
– Giorgos Myrianthous
Nov 20 '18 at 11:01
|
show 4 more comments
I have a Kafka
application that has a producer
who produces messages to a topic. A consumer
then takes the messages from the topic, does some logic to the given messages and then produces them to another topic.
I'm using ProducerRecord
and ConsumerRecords
.
I want my app to create 2 compacted topics
and then use them. If the compacted topics
already exist, just display a message and continue.
My SimpleProducer
class:
package com.kafkatest.demo;
import java.util.*;
import org.apache.kafka.clients.producer.*;
public class SimpleProducer extends Thread{
public static void main(String args) throws Exception{
String topicName = "nodesTopic";
String key = "Key1";
String value = "Value-1";
String key1 = "Key2";
String value1 = "Value-2";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092,localhost:9093");
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer <>(props);
ProducerRecord<String, String> record = new ProducerRecord<>(topicName,key,value);
producer.send(record);
ProducerRecord<String, String> record2 = new ProducerRecord<>(topicName,key1,value1);
producer.send(record2);
ProducerRecord<String, String> record3 = new ProducerRecord<>(topicName,key,value);
producer.send(record3);
ProducerRecord<String, String> record4 = new ProducerRecord<>(topicName,key,value);
producer.send(record4);
ProducerRecord<String, String> record5 = new ProducerRecord<>(topicName,key,value);
producer.send(record5);
ProducerRecord<String, String> record6 = new ProducerRecord<>(topicName,key,value);
producer.send(record6);
producer.close();
System.out.println("SimpleProducer Completed.");
}
}
My SimpleConsumer class:
package com.kafkatest.demo;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class SimpleConsumer extends Thread{
public static void main(String args) {
Properties props1 = new Properties();
props1.put("bootstrap.servers", "localhost:9092,localhost:9093");
props1.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props1.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer <>(props1);
Duration duration = Duration.of(2, ChronoUnit.MINUTES);
String topicName = "nodesTopic";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "consumer-tutorial");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topicName));
try {
while (true) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
consumer.beginningOffsets(consumer.assignment());
ConsumerRecords<String, String> records = consumer.poll(duration);
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.offset() + ": " + record.value());
System.out.println("Record: " + record.value().toLowerCase());
ProducerRecord<String, String> record1 = new ProducerRecord<>("forecastTopic", "Key", record.offset() + ". " + record.value().toLowerCase());
String a = "" + records.count();
ProducerRecord<String, String> record2 = new ProducerRecord<>("forecastTopic", "Key", record.offset() + ". " + a);
producer.send(record1);
producer.send(record2);
}
}
} finally {
producer.close();
consumer.close();
}
}
}
When I run bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic forecastTopic --from-beginning
and I run my producer a couple of time, I get
0. value-1
0. 6
1. value-2
1. 6
2. value-1
2. 6
3. value-1
3. 6
4. value-1
4. 6
5. value-1
5. 6
6. value-1
6. 6
7. value-2
7. 6
8. value-1
8. 6
9. value-1
9. 6
10. value-1
10. 6
11. value-1
11. 6
12. value-1
12. 6
13. value-2
13. 6
14. value-1
14. 6
15. value-1
15. 6
16. value-1
16. 6
17. value-1
17. 6
18. value-1
18. 6
19. value-2
19. 6
20. value-1
20. 6
21. value-1
21. 6
22. value-1
22. 6
23. value-1
23. 6
24. value-1
24. 6
25. value-2
25. 6
26. value-1
26. 6
27. value-1
27. 6
28. value-1
28. 6
29. value-1
29. 6
30. value-1
30. 6
31. value-2
31. 6
32. value-1
32. 6
33. value-1
33. 6
34. value-1
34. 6
35. value-1
35. 6
36. value-1
36. 6
37. value-2
37. 6
38. value-1
38. 6
39. value-1
39. 6
40. value-1
40. 6
41. value-1
41. 6
42. value-1
42. 6
43. value-2
43. 6
44. value-1
44. 6
45. value-1
45. 6
46. value-1
46. 6
47. value-1
47. 6
48. value-1
48. 12
49. value-2
49. 12
50. value-1
50. 12
51. value-1
51. 12
52. value-1
52. 12
53. value-1
53. 12
54. value-1
54. 12
55. value-2
55. 12
56. value-1
56. 12
57. value-1
57. 12
58. value-1
58. 12
59. value-1
59. 12
60. value-1
60. 6
61. value-2
61. 6
62. value-1
62. 6
63. value-1
63. 6
64. value-1
64. 6
65. value-1
65. 6
66. value-1
66. 6
67. value-2
67. 6
68. value-1
68. 6
69. value-1
69. 6
70. value-1
70. 6
71. value-1
71. 6
72. value-1
72. 6
73. value-2
73. 6
74. value-1
74. 6
75. value-1
75. 6
76. value-1
76. 6
77. value-1
77. 6
78. value-1
78. 6
79. value-2
79. 6
80. value-1
80. 6
81. value-1
81. 6
82. value-1
82. 6
83. value-1
83. 6
I put the log.cleanup.policy=compact
in the server.properties
file, but it doesn't seem to work, because I have all the 83 offsets in the topic.
Thank you.
java apache-kafka
I have a Kafka
application that has a producer
who produces messages to a topic. A consumer
then takes the messages from the topic, does some logic to the given messages and then produces them to another topic.
I'm using ProducerRecord
and ConsumerRecords
.
I want my app to create 2 compacted topics
and then use them. If the compacted topics
already exist, just display a message and continue.
My SimpleProducer
class:
package com.kafkatest.demo;
import java.util.*;
import org.apache.kafka.clients.producer.*;
public class SimpleProducer extends Thread{
public static void main(String args) throws Exception{
String topicName = "nodesTopic";
String key = "Key1";
String value = "Value-1";
String key1 = "Key2";
String value1 = "Value-2";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092,localhost:9093");
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer <>(props);
ProducerRecord<String, String> record = new ProducerRecord<>(topicName,key,value);
producer.send(record);
ProducerRecord<String, String> record2 = new ProducerRecord<>(topicName,key1,value1);
producer.send(record2);
ProducerRecord<String, String> record3 = new ProducerRecord<>(topicName,key,value);
producer.send(record3);
ProducerRecord<String, String> record4 = new ProducerRecord<>(topicName,key,value);
producer.send(record4);
ProducerRecord<String, String> record5 = new ProducerRecord<>(topicName,key,value);
producer.send(record5);
ProducerRecord<String, String> record6 = new ProducerRecord<>(topicName,key,value);
producer.send(record6);
producer.close();
System.out.println("SimpleProducer Completed.");
}
}
My SimpleConsumer class:
package com.kafkatest.demo;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class SimpleConsumer extends Thread{
public static void main(String args) {
Properties props1 = new Properties();
props1.put("bootstrap.servers", "localhost:9092,localhost:9093");
props1.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props1.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer <>(props1);
Duration duration = Duration.of(2, ChronoUnit.MINUTES);
String topicName = "nodesTopic";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "consumer-tutorial");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topicName));
try {
while (true) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
consumer.beginningOffsets(consumer.assignment());
ConsumerRecords<String, String> records = consumer.poll(duration);
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.offset() + ": " + record.value());
System.out.println("Record: " + record.value().toLowerCase());
ProducerRecord<String, String> record1 = new ProducerRecord<>("forecastTopic", "Key", record.offset() + ". " + record.value().toLowerCase());
String a = "" + records.count();
ProducerRecord<String, String> record2 = new ProducerRecord<>("forecastTopic", "Key", record.offset() + ". " + a);
producer.send(record1);
producer.send(record2);
}
}
} finally {
producer.close();
consumer.close();
}
}
}
When I run bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic forecastTopic --from-beginning
and I run my producer a couple of time, I get
0. value-1
0. 6
1. value-2
1. 6
2. value-1
2. 6
3. value-1
3. 6
4. value-1
4. 6
5. value-1
5. 6
6. value-1
6. 6
7. value-2
7. 6
8. value-1
8. 6
9. value-1
9. 6
10. value-1
10. 6
11. value-1
11. 6
12. value-1
12. 6
13. value-2
13. 6
14. value-1
14. 6
15. value-1
15. 6
16. value-1
16. 6
17. value-1
17. 6
18. value-1
18. 6
19. value-2
19. 6
20. value-1
20. 6
21. value-1
21. 6
22. value-1
22. 6
23. value-1
23. 6
24. value-1
24. 6
25. value-2
25. 6
26. value-1
26. 6
27. value-1
27. 6
28. value-1
28. 6
29. value-1
29. 6
30. value-1
30. 6
31. value-2
31. 6
32. value-1
32. 6
33. value-1
33. 6
34. value-1
34. 6
35. value-1
35. 6
36. value-1
36. 6
37. value-2
37. 6
38. value-1
38. 6
39. value-1
39. 6
40. value-1
40. 6
41. value-1
41. 6
42. value-1
42. 6
43. value-2
43. 6
44. value-1
44. 6
45. value-1
45. 6
46. value-1
46. 6
47. value-1
47. 6
48. value-1
48. 12
49. value-2
49. 12
50. value-1
50. 12
51. value-1
51. 12
52. value-1
52. 12
53. value-1
53. 12
54. value-1
54. 12
55. value-2
55. 12
56. value-1
56. 12
57. value-1
57. 12
58. value-1
58. 12
59. value-1
59. 12
60. value-1
60. 6
61. value-2
61. 6
62. value-1
62. 6
63. value-1
63. 6
64. value-1
64. 6
65. value-1
65. 6
66. value-1
66. 6
67. value-2
67. 6
68. value-1
68. 6
69. value-1
69. 6
70. value-1
70. 6
71. value-1
71. 6
72. value-1
72. 6
73. value-2
73. 6
74. value-1
74. 6
75. value-1
75. 6
76. value-1
76. 6
77. value-1
77. 6
78. value-1
78. 6
79. value-2
79. 6
80. value-1
80. 6
81. value-1
81. 6
82. value-1
82. 6
83. value-1
83. 6
I put the log.cleanup.policy=compact
in the server.properties
file, but it doesn't seem to work, because I have all the 83 offsets in the topic.
Thank you.
java apache-kafka
java apache-kafka
edited Nov 20 '18 at 15:19
cricket_007
81.9k1143111
81.9k1143111
asked Nov 20 '18 at 9:44
Milan PanicMilan Panic
548
548
What did you try so far?
– Giorgos Myrianthous
Nov 20 '18 at 10:50
I foundlog.cleanup.policy=compact
, but I don't know where to put it. Is it somewhere in theKafka
config ?
– Milan Panic
Nov 20 '18 at 10:53
1
You should place it under/etc/kafka/server.properties
.
– Giorgos Myrianthous
Nov 20 '18 at 10:56
Does it matter where I place it in this file?
– Milan Panic
Nov 20 '18 at 11:00
No, it doesn't matter.
– Giorgos Myrianthous
Nov 20 '18 at 11:01
|
show 4 more comments
What did you try so far?
– Giorgos Myrianthous
Nov 20 '18 at 10:50
I foundlog.cleanup.policy=compact
, but I don't know where to put it. Is it somewhere in theKafka
config ?
– Milan Panic
Nov 20 '18 at 10:53
1
You should place it under/etc/kafka/server.properties
.
– Giorgos Myrianthous
Nov 20 '18 at 10:56
Does it matter where I place it in this file?
– Milan Panic
Nov 20 '18 at 11:00
No, it doesn't matter.
– Giorgos Myrianthous
Nov 20 '18 at 11:01
What did you try so far?
– Giorgos Myrianthous
Nov 20 '18 at 10:50
What did you try so far?
– Giorgos Myrianthous
Nov 20 '18 at 10:50
I found
log.cleanup.policy=compact
, but I don't know where to put it. Is it somewhere in the Kafka
config ?– Milan Panic
Nov 20 '18 at 10:53
I found
log.cleanup.policy=compact
, but I don't know where to put it. Is it somewhere in the Kafka
config ?– Milan Panic
Nov 20 '18 at 10:53
1
1
You should place it under
/etc/kafka/server.properties
.– Giorgos Myrianthous
Nov 20 '18 at 10:56
You should place it under
/etc/kafka/server.properties
.– Giorgos Myrianthous
Nov 20 '18 at 10:56
Does it matter where I place it in this file?
– Milan Panic
Nov 20 '18 at 11:00
Does it matter where I place it in this file?
– Milan Panic
Nov 20 '18 at 11:00
No, it doesn't matter.
– Giorgos Myrianthous
Nov 20 '18 at 11:01
No, it doesn't matter.
– Giorgos Myrianthous
Nov 20 '18 at 11:01
|
show 4 more comments
1 Answer
1
active
oldest
votes
When you set log.cleanup.policy=compact
in server.properties, it will be the default policy when creating new topics. If you change server.properties after creating your topic, your topic configuration won't change.
You can alter your topic configuration to set cleanup.policy=compact
As compaction is made by log cleaner, you may want to set specific delete.retention.ms
on your topic as default retention is 24 hours.
Last, compaction doesn't occur on active segment.
see Kafka Log Compaction not starting
I deleted all thekafka logs
and started new with thecleanup.policy=compact
inserver.properties
, created the topics and it is not working again, These are my settings:log.cleaner.enable=true
min.cleanable.dirty.ratio=0.01
log.cleanup.policy=compact
segment.ms=100
delete.retention.ms=100
– Milan Panic
Nov 21 '18 at 10:10
When you say it is not working, you mean not matter how long you wait, your consumer never consumes only two records corresponding to the two keys in the topic ?
– Gery
Nov 22 '18 at 10:55
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53390170%2fhow-to-create-kafka-compacted-topic%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
When you set log.cleanup.policy=compact
in server.properties, it will be the default policy when creating new topics. If you change server.properties after creating your topic, your topic configuration won't change.
You can alter your topic configuration to set cleanup.policy=compact
As compaction is made by log cleaner, you may want to set specific delete.retention.ms
on your topic as default retention is 24 hours.
Last, compaction doesn't occur on active segment.
see Kafka Log Compaction not starting
I deleted all thekafka logs
and started new with thecleanup.policy=compact
inserver.properties
, created the topics and it is not working again, These are my settings:log.cleaner.enable=true
min.cleanable.dirty.ratio=0.01
log.cleanup.policy=compact
segment.ms=100
delete.retention.ms=100
– Milan Panic
Nov 21 '18 at 10:10
When you say it is not working, you mean not matter how long you wait, your consumer never consumes only two records corresponding to the two keys in the topic ?
– Gery
Nov 22 '18 at 10:55
add a comment |
When you set log.cleanup.policy=compact
in server.properties, it will be the default policy when creating new topics. If you change server.properties after creating your topic, your topic configuration won't change.
You can alter your topic configuration to set cleanup.policy=compact
As compaction is made by log cleaner, you may want to set specific delete.retention.ms
on your topic as default retention is 24 hours.
Last, compaction doesn't occur on active segment.
see Kafka Log Compaction not starting
I deleted all thekafka logs
and started new with thecleanup.policy=compact
inserver.properties
, created the topics and it is not working again, These are my settings:log.cleaner.enable=true
min.cleanable.dirty.ratio=0.01
log.cleanup.policy=compact
segment.ms=100
delete.retention.ms=100
– Milan Panic
Nov 21 '18 at 10:10
When you say it is not working, you mean not matter how long you wait, your consumer never consumes only two records corresponding to the two keys in the topic ?
– Gery
Nov 22 '18 at 10:55
add a comment |
When you set log.cleanup.policy=compact
in server.properties, it will be the default policy when creating new topics. If you change server.properties after creating your topic, your topic configuration won't change.
You can alter your topic configuration to set cleanup.policy=compact
As compaction is made by log cleaner, you may want to set specific delete.retention.ms
on your topic as default retention is 24 hours.
Last, compaction doesn't occur on active segment.
see Kafka Log Compaction not starting
When you set log.cleanup.policy=compact
in server.properties, it will be the default policy when creating new topics. If you change server.properties after creating your topic, your topic configuration won't change.
You can alter your topic configuration to set cleanup.policy=compact
As compaction is made by log cleaner, you may want to set specific delete.retention.ms
on your topic as default retention is 24 hours.
Last, compaction doesn't occur on active segment.
see Kafka Log Compaction not starting
answered Nov 20 '18 at 17:51
GeryGery
31116
31116
I deleted all thekafka logs
and started new with thecleanup.policy=compact
inserver.properties
, created the topics and it is not working again, These are my settings:log.cleaner.enable=true
min.cleanable.dirty.ratio=0.01
log.cleanup.policy=compact
segment.ms=100
delete.retention.ms=100
– Milan Panic
Nov 21 '18 at 10:10
When you say it is not working, you mean not matter how long you wait, your consumer never consumes only two records corresponding to the two keys in the topic ?
– Gery
Nov 22 '18 at 10:55
add a comment |
I deleted all thekafka logs
and started new with thecleanup.policy=compact
inserver.properties
, created the topics and it is not working again, These are my settings:log.cleaner.enable=true
min.cleanable.dirty.ratio=0.01
log.cleanup.policy=compact
segment.ms=100
delete.retention.ms=100
– Milan Panic
Nov 21 '18 at 10:10
When you say it is not working, you mean not matter how long you wait, your consumer never consumes only two records corresponding to the two keys in the topic ?
– Gery
Nov 22 '18 at 10:55
I deleted all the
kafka logs
and started new with the cleanup.policy=compact
in server.properties
, created the topics and it is not working again, These are my settings: log.cleaner.enable=true
min.cleanable.dirty.ratio=0.01
log.cleanup.policy=compact
segment.ms=100
delete.retention.ms=100
– Milan Panic
Nov 21 '18 at 10:10
I deleted all the
kafka logs
and started new with the cleanup.policy=compact
in server.properties
, created the topics and it is not working again, These are my settings: log.cleaner.enable=true
min.cleanable.dirty.ratio=0.01
log.cleanup.policy=compact
segment.ms=100
delete.retention.ms=100
– Milan Panic
Nov 21 '18 at 10:10
When you say it is not working, you mean not matter how long you wait, your consumer never consumes only two records corresponding to the two keys in the topic ?
– Gery
Nov 22 '18 at 10:55
When you say it is not working, you mean not matter how long you wait, your consumer never consumes only two records corresponding to the two keys in the topic ?
– Gery
Nov 22 '18 at 10:55
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53390170%2fhow-to-create-kafka-compacted-topic%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
What did you try so far?
– Giorgos Myrianthous
Nov 20 '18 at 10:50
I found
log.cleanup.policy=compact
, but I don't know where to put it. Is it somewhere in theKafka
config ?– Milan Panic
Nov 20 '18 at 10:53
1
You should place it under
/etc/kafka/server.properties
.– Giorgos Myrianthous
Nov 20 '18 at 10:56
Does it matter where I place it in this file?
– Milan Panic
Nov 20 '18 at 11:00
No, it doesn't matter.
– Giorgos Myrianthous
Nov 20 '18 at 11:01