How to create kafka compacted topic












0















I have a Kafka application that has a producer who produces messages to a topic. A consumer then takes the messages from the topic, does some logic to the given messages and then produces them to another topic.
I'm using ProducerRecord and ConsumerRecords.



I want my app to create 2 compacted topics and then use them. If the compacted topics already exist, just display a message and continue.



My SimpleProducer class:



  package com.kafkatest.demo;

import java.util.*;

import org.apache.kafka.clients.producer.*;
public class SimpleProducer extends Thread{

public static void main(String args) throws Exception{

String topicName = "nodesTopic";
String key = "Key1";
String value = "Value-1";

String key1 = "Key2";
String value1 = "Value-2";



Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092,localhost:9093");
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer <>(props);

ProducerRecord<String, String> record = new ProducerRecord<>(topicName,key,value);
producer.send(record);


ProducerRecord<String, String> record2 = new ProducerRecord<>(topicName,key1,value1);
producer.send(record2);


ProducerRecord<String, String> record3 = new ProducerRecord<>(topicName,key,value);
producer.send(record3);


ProducerRecord<String, String> record4 = new ProducerRecord<>(topicName,key,value);
producer.send(record4);


ProducerRecord<String, String> record5 = new ProducerRecord<>(topicName,key,value);
producer.send(record5);


ProducerRecord<String, String> record6 = new ProducerRecord<>(topicName,key,value);
producer.send(record6);
producer.close();

System.out.println("SimpleProducer Completed.");
}
}


My SimpleConsumer class:



   package com.kafkatest.demo;

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class SimpleConsumer extends Thread{

public static void main(String args) {

Properties props1 = new Properties();
props1.put("bootstrap.servers", "localhost:9092,localhost:9093");
props1.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props1.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer <>(props1);

Duration duration = Duration.of(2, ChronoUnit.MINUTES);
String topicName = "nodesTopic";

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "consumer-tutorial");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

consumer.subscribe(Arrays.asList(topicName));

try {
while (true) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
consumer.beginningOffsets(consumer.assignment());
ConsumerRecords<String, String> records = consumer.poll(duration);
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.offset() + ": " + record.value());
System.out.println("Record: " + record.value().toLowerCase());
ProducerRecord<String, String> record1 = new ProducerRecord<>("forecastTopic", "Key", record.offset() + ". " + record.value().toLowerCase());
String a = "" + records.count();
ProducerRecord<String, String> record2 = new ProducerRecord<>("forecastTopic", "Key", record.offset() + ". " + a);
producer.send(record1);
producer.send(record2);
}
}
} finally {
producer.close();
consumer.close();
}

}

}


When I run bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic forecastTopic --from-beginning and I run my producer a couple of time, I get



0. value-1
0. 6
1. value-2
1. 6
2. value-1
2. 6
3. value-1
3. 6
4. value-1
4. 6
5. value-1
5. 6
6. value-1
6. 6
7. value-2
7. 6
8. value-1
8. 6
9. value-1
9. 6
10. value-1
10. 6
11. value-1
11. 6
12. value-1
12. 6
13. value-2
13. 6
14. value-1
14. 6
15. value-1
15. 6
16. value-1
16. 6
17. value-1
17. 6
18. value-1
18. 6
19. value-2
19. 6
20. value-1
20. 6
21. value-1
21. 6
22. value-1
22. 6
23. value-1
23. 6
24. value-1
24. 6
25. value-2
25. 6
26. value-1
26. 6
27. value-1
27. 6
28. value-1
28. 6
29. value-1
29. 6
30. value-1
30. 6
31. value-2
31. 6
32. value-1
32. 6
33. value-1
33. 6
34. value-1
34. 6
35. value-1
35. 6
36. value-1
36. 6
37. value-2
37. 6
38. value-1
38. 6
39. value-1
39. 6
40. value-1
40. 6
41. value-1
41. 6
42. value-1
42. 6
43. value-2
43. 6
44. value-1
44. 6
45. value-1
45. 6
46. value-1
46. 6
47. value-1
47. 6
48. value-1
48. 12
49. value-2
49. 12
50. value-1
50. 12
51. value-1
51. 12
52. value-1
52. 12
53. value-1
53. 12
54. value-1
54. 12
55. value-2
55. 12
56. value-1
56. 12
57. value-1
57. 12
58. value-1
58. 12
59. value-1
59. 12
60. value-1
60. 6
61. value-2
61. 6
62. value-1
62. 6
63. value-1
63. 6
64. value-1
64. 6
65. value-1
65. 6
66. value-1
66. 6
67. value-2
67. 6
68. value-1
68. 6
69. value-1
69. 6
70. value-1
70. 6
71. value-1
71. 6
72. value-1
72. 6
73. value-2
73. 6
74. value-1
74. 6
75. value-1
75. 6
76. value-1
76. 6
77. value-1
77. 6
78. value-1
78. 6
79. value-2
79. 6
80. value-1
80. 6
81. value-1
81. 6
82. value-1
82. 6
83. value-1
83. 6


I put the log.cleanup.policy=compact in the server.properties file, but it doesn't seem to work, because I have all the 83 offsets in the topic.



Thank you.










share|improve this question

























  • What did you try so far?

    – Giorgos Myrianthous
    Nov 20 '18 at 10:50











  • I found log.cleanup.policy=compact, but I don't know where to put it. Is it somewhere in the Kafka config ?

    – Milan Panic
    Nov 20 '18 at 10:53






  • 1





    You should place it under /etc/kafka/server.properties.

    – Giorgos Myrianthous
    Nov 20 '18 at 10:56











  • Does it matter where I place it in this file?

    – Milan Panic
    Nov 20 '18 at 11:00











  • No, it doesn't matter.

    – Giorgos Myrianthous
    Nov 20 '18 at 11:01
















0















I have a Kafka application that has a producer who produces messages to a topic. A consumer then takes the messages from the topic, does some logic to the given messages and then produces them to another topic.
I'm using ProducerRecord and ConsumerRecords.



I want my app to create 2 compacted topics and then use them. If the compacted topics already exist, just display a message and continue.



My SimpleProducer class:



  package com.kafkatest.demo;

import java.util.*;

import org.apache.kafka.clients.producer.*;
public class SimpleProducer extends Thread{

public static void main(String args) throws Exception{

String topicName = "nodesTopic";
String key = "Key1";
String value = "Value-1";

String key1 = "Key2";
String value1 = "Value-2";



Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092,localhost:9093");
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer <>(props);

ProducerRecord<String, String> record = new ProducerRecord<>(topicName,key,value);
producer.send(record);


ProducerRecord<String, String> record2 = new ProducerRecord<>(topicName,key1,value1);
producer.send(record2);


ProducerRecord<String, String> record3 = new ProducerRecord<>(topicName,key,value);
producer.send(record3);


ProducerRecord<String, String> record4 = new ProducerRecord<>(topicName,key,value);
producer.send(record4);


ProducerRecord<String, String> record5 = new ProducerRecord<>(topicName,key,value);
producer.send(record5);


ProducerRecord<String, String> record6 = new ProducerRecord<>(topicName,key,value);
producer.send(record6);
producer.close();

System.out.println("SimpleProducer Completed.");
}
}


My SimpleConsumer class:



   package com.kafkatest.demo;

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class SimpleConsumer extends Thread{

public static void main(String args) {

Properties props1 = new Properties();
props1.put("bootstrap.servers", "localhost:9092,localhost:9093");
props1.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props1.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer <>(props1);

Duration duration = Duration.of(2, ChronoUnit.MINUTES);
String topicName = "nodesTopic";

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "consumer-tutorial");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

consumer.subscribe(Arrays.asList(topicName));

try {
while (true) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
consumer.beginningOffsets(consumer.assignment());
ConsumerRecords<String, String> records = consumer.poll(duration);
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.offset() + ": " + record.value());
System.out.println("Record: " + record.value().toLowerCase());
ProducerRecord<String, String> record1 = new ProducerRecord<>("forecastTopic", "Key", record.offset() + ". " + record.value().toLowerCase());
String a = "" + records.count();
ProducerRecord<String, String> record2 = new ProducerRecord<>("forecastTopic", "Key", record.offset() + ". " + a);
producer.send(record1);
producer.send(record2);
}
}
} finally {
producer.close();
consumer.close();
}

}

}


When I run bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic forecastTopic --from-beginning and I run my producer a couple of time, I get



0. value-1
0. 6
1. value-2
1. 6
2. value-1
2. 6
3. value-1
3. 6
4. value-1
4. 6
5. value-1
5. 6
6. value-1
6. 6
7. value-2
7. 6
8. value-1
8. 6
9. value-1
9. 6
10. value-1
10. 6
11. value-1
11. 6
12. value-1
12. 6
13. value-2
13. 6
14. value-1
14. 6
15. value-1
15. 6
16. value-1
16. 6
17. value-1
17. 6
18. value-1
18. 6
19. value-2
19. 6
20. value-1
20. 6
21. value-1
21. 6
22. value-1
22. 6
23. value-1
23. 6
24. value-1
24. 6
25. value-2
25. 6
26. value-1
26. 6
27. value-1
27. 6
28. value-1
28. 6
29. value-1
29. 6
30. value-1
30. 6
31. value-2
31. 6
32. value-1
32. 6
33. value-1
33. 6
34. value-1
34. 6
35. value-1
35. 6
36. value-1
36. 6
37. value-2
37. 6
38. value-1
38. 6
39. value-1
39. 6
40. value-1
40. 6
41. value-1
41. 6
42. value-1
42. 6
43. value-2
43. 6
44. value-1
44. 6
45. value-1
45. 6
46. value-1
46. 6
47. value-1
47. 6
48. value-1
48. 12
49. value-2
49. 12
50. value-1
50. 12
51. value-1
51. 12
52. value-1
52. 12
53. value-1
53. 12
54. value-1
54. 12
55. value-2
55. 12
56. value-1
56. 12
57. value-1
57. 12
58. value-1
58. 12
59. value-1
59. 12
60. value-1
60. 6
61. value-2
61. 6
62. value-1
62. 6
63. value-1
63. 6
64. value-1
64. 6
65. value-1
65. 6
66. value-1
66. 6
67. value-2
67. 6
68. value-1
68. 6
69. value-1
69. 6
70. value-1
70. 6
71. value-1
71. 6
72. value-1
72. 6
73. value-2
73. 6
74. value-1
74. 6
75. value-1
75. 6
76. value-1
76. 6
77. value-1
77. 6
78. value-1
78. 6
79. value-2
79. 6
80. value-1
80. 6
81. value-1
81. 6
82. value-1
82. 6
83. value-1
83. 6


I put the log.cleanup.policy=compact in the server.properties file, but it doesn't seem to work, because I have all the 83 offsets in the topic.



Thank you.










share|improve this question

























  • What did you try so far?

    – Giorgos Myrianthous
    Nov 20 '18 at 10:50











  • I found log.cleanup.policy=compact, but I don't know where to put it. Is it somewhere in the Kafka config ?

    – Milan Panic
    Nov 20 '18 at 10:53






  • 1





    You should place it under /etc/kafka/server.properties.

    – Giorgos Myrianthous
    Nov 20 '18 at 10:56











  • Does it matter where I place it in this file?

    – Milan Panic
    Nov 20 '18 at 11:00











  • No, it doesn't matter.

    – Giorgos Myrianthous
    Nov 20 '18 at 11:01














0












0








0








I have a Kafka application that has a producer who produces messages to a topic. A consumer then takes the messages from the topic, does some logic to the given messages and then produces them to another topic.
I'm using ProducerRecord and ConsumerRecords.



I want my app to create 2 compacted topics and then use them. If the compacted topics already exist, just display a message and continue.



My SimpleProducer class:



  package com.kafkatest.demo;

import java.util.*;

import org.apache.kafka.clients.producer.*;
public class SimpleProducer extends Thread{

public static void main(String args) throws Exception{

String topicName = "nodesTopic";
String key = "Key1";
String value = "Value-1";

String key1 = "Key2";
String value1 = "Value-2";



Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092,localhost:9093");
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer <>(props);

ProducerRecord<String, String> record = new ProducerRecord<>(topicName,key,value);
producer.send(record);


ProducerRecord<String, String> record2 = new ProducerRecord<>(topicName,key1,value1);
producer.send(record2);


ProducerRecord<String, String> record3 = new ProducerRecord<>(topicName,key,value);
producer.send(record3);


ProducerRecord<String, String> record4 = new ProducerRecord<>(topicName,key,value);
producer.send(record4);


ProducerRecord<String, String> record5 = new ProducerRecord<>(topicName,key,value);
producer.send(record5);


ProducerRecord<String, String> record6 = new ProducerRecord<>(topicName,key,value);
producer.send(record6);
producer.close();

System.out.println("SimpleProducer Completed.");
}
}


My SimpleConsumer class:



   package com.kafkatest.demo;

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class SimpleConsumer extends Thread{

public static void main(String args) {

Properties props1 = new Properties();
props1.put("bootstrap.servers", "localhost:9092,localhost:9093");
props1.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props1.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer <>(props1);

Duration duration = Duration.of(2, ChronoUnit.MINUTES);
String topicName = "nodesTopic";

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "consumer-tutorial");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

consumer.subscribe(Arrays.asList(topicName));

try {
while (true) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
consumer.beginningOffsets(consumer.assignment());
ConsumerRecords<String, String> records = consumer.poll(duration);
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.offset() + ": " + record.value());
System.out.println("Record: " + record.value().toLowerCase());
ProducerRecord<String, String> record1 = new ProducerRecord<>("forecastTopic", "Key", record.offset() + ". " + record.value().toLowerCase());
String a = "" + records.count();
ProducerRecord<String, String> record2 = new ProducerRecord<>("forecastTopic", "Key", record.offset() + ". " + a);
producer.send(record1);
producer.send(record2);
}
}
} finally {
producer.close();
consumer.close();
}

}

}


When I run bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic forecastTopic --from-beginning and I run my producer a couple of time, I get



0. value-1
0. 6
1. value-2
1. 6
2. value-1
2. 6
3. value-1
3. 6
4. value-1
4. 6
5. value-1
5. 6
6. value-1
6. 6
7. value-2
7. 6
8. value-1
8. 6
9. value-1
9. 6
10. value-1
10. 6
11. value-1
11. 6
12. value-1
12. 6
13. value-2
13. 6
14. value-1
14. 6
15. value-1
15. 6
16. value-1
16. 6
17. value-1
17. 6
18. value-1
18. 6
19. value-2
19. 6
20. value-1
20. 6
21. value-1
21. 6
22. value-1
22. 6
23. value-1
23. 6
24. value-1
24. 6
25. value-2
25. 6
26. value-1
26. 6
27. value-1
27. 6
28. value-1
28. 6
29. value-1
29. 6
30. value-1
30. 6
31. value-2
31. 6
32. value-1
32. 6
33. value-1
33. 6
34. value-1
34. 6
35. value-1
35. 6
36. value-1
36. 6
37. value-2
37. 6
38. value-1
38. 6
39. value-1
39. 6
40. value-1
40. 6
41. value-1
41. 6
42. value-1
42. 6
43. value-2
43. 6
44. value-1
44. 6
45. value-1
45. 6
46. value-1
46. 6
47. value-1
47. 6
48. value-1
48. 12
49. value-2
49. 12
50. value-1
50. 12
51. value-1
51. 12
52. value-1
52. 12
53. value-1
53. 12
54. value-1
54. 12
55. value-2
55. 12
56. value-1
56. 12
57. value-1
57. 12
58. value-1
58. 12
59. value-1
59. 12
60. value-1
60. 6
61. value-2
61. 6
62. value-1
62. 6
63. value-1
63. 6
64. value-1
64. 6
65. value-1
65. 6
66. value-1
66. 6
67. value-2
67. 6
68. value-1
68. 6
69. value-1
69. 6
70. value-1
70. 6
71. value-1
71. 6
72. value-1
72. 6
73. value-2
73. 6
74. value-1
74. 6
75. value-1
75. 6
76. value-1
76. 6
77. value-1
77. 6
78. value-1
78. 6
79. value-2
79. 6
80. value-1
80. 6
81. value-1
81. 6
82. value-1
82. 6
83. value-1
83. 6


I put the log.cleanup.policy=compact in the server.properties file, but it doesn't seem to work, because I have all the 83 offsets in the topic.



Thank you.










share|improve this question
















I have a Kafka application that has a producer who produces messages to a topic. A consumer then takes the messages from the topic, does some logic to the given messages and then produces them to another topic.
I'm using ProducerRecord and ConsumerRecords.



I want my app to create 2 compacted topics and then use them. If the compacted topics already exist, just display a message and continue.



My SimpleProducer class:



  package com.kafkatest.demo;

import java.util.*;

import org.apache.kafka.clients.producer.*;
public class SimpleProducer extends Thread{

public static void main(String args) throws Exception{

String topicName = "nodesTopic";
String key = "Key1";
String value = "Value-1";

String key1 = "Key2";
String value1 = "Value-2";



Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092,localhost:9093");
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer <>(props);

ProducerRecord<String, String> record = new ProducerRecord<>(topicName,key,value);
producer.send(record);


ProducerRecord<String, String> record2 = new ProducerRecord<>(topicName,key1,value1);
producer.send(record2);


ProducerRecord<String, String> record3 = new ProducerRecord<>(topicName,key,value);
producer.send(record3);


ProducerRecord<String, String> record4 = new ProducerRecord<>(topicName,key,value);
producer.send(record4);


ProducerRecord<String, String> record5 = new ProducerRecord<>(topicName,key,value);
producer.send(record5);


ProducerRecord<String, String> record6 = new ProducerRecord<>(topicName,key,value);
producer.send(record6);
producer.close();

System.out.println("SimpleProducer Completed.");
}
}


My SimpleConsumer class:



   package com.kafkatest.demo;

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class SimpleConsumer extends Thread{

public static void main(String args) {

Properties props1 = new Properties();
props1.put("bootstrap.servers", "localhost:9092,localhost:9093");
props1.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props1.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer <>(props1);

Duration duration = Duration.of(2, ChronoUnit.MINUTES);
String topicName = "nodesTopic";

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "consumer-tutorial");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

consumer.subscribe(Arrays.asList(topicName));

try {
while (true) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
consumer.beginningOffsets(consumer.assignment());
ConsumerRecords<String, String> records = consumer.poll(duration);
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.offset() + ": " + record.value());
System.out.println("Record: " + record.value().toLowerCase());
ProducerRecord<String, String> record1 = new ProducerRecord<>("forecastTopic", "Key", record.offset() + ". " + record.value().toLowerCase());
String a = "" + records.count();
ProducerRecord<String, String> record2 = new ProducerRecord<>("forecastTopic", "Key", record.offset() + ". " + a);
producer.send(record1);
producer.send(record2);
}
}
} finally {
producer.close();
consumer.close();
}

}

}


When I run bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic forecastTopic --from-beginning and I run my producer a couple of time, I get



0. value-1
0. 6
1. value-2
1. 6
2. value-1
2. 6
3. value-1
3. 6
4. value-1
4. 6
5. value-1
5. 6
6. value-1
6. 6
7. value-2
7. 6
8. value-1
8. 6
9. value-1
9. 6
10. value-1
10. 6
11. value-1
11. 6
12. value-1
12. 6
13. value-2
13. 6
14. value-1
14. 6
15. value-1
15. 6
16. value-1
16. 6
17. value-1
17. 6
18. value-1
18. 6
19. value-2
19. 6
20. value-1
20. 6
21. value-1
21. 6
22. value-1
22. 6
23. value-1
23. 6
24. value-1
24. 6
25. value-2
25. 6
26. value-1
26. 6
27. value-1
27. 6
28. value-1
28. 6
29. value-1
29. 6
30. value-1
30. 6
31. value-2
31. 6
32. value-1
32. 6
33. value-1
33. 6
34. value-1
34. 6
35. value-1
35. 6
36. value-1
36. 6
37. value-2
37. 6
38. value-1
38. 6
39. value-1
39. 6
40. value-1
40. 6
41. value-1
41. 6
42. value-1
42. 6
43. value-2
43. 6
44. value-1
44. 6
45. value-1
45. 6
46. value-1
46. 6
47. value-1
47. 6
48. value-1
48. 12
49. value-2
49. 12
50. value-1
50. 12
51. value-1
51. 12
52. value-1
52. 12
53. value-1
53. 12
54. value-1
54. 12
55. value-2
55. 12
56. value-1
56. 12
57. value-1
57. 12
58. value-1
58. 12
59. value-1
59. 12
60. value-1
60. 6
61. value-2
61. 6
62. value-1
62. 6
63. value-1
63. 6
64. value-1
64. 6
65. value-1
65. 6
66. value-1
66. 6
67. value-2
67. 6
68. value-1
68. 6
69. value-1
69. 6
70. value-1
70. 6
71. value-1
71. 6
72. value-1
72. 6
73. value-2
73. 6
74. value-1
74. 6
75. value-1
75. 6
76. value-1
76. 6
77. value-1
77. 6
78. value-1
78. 6
79. value-2
79. 6
80. value-1
80. 6
81. value-1
81. 6
82. value-1
82. 6
83. value-1
83. 6


I put the log.cleanup.policy=compact in the server.properties file, but it doesn't seem to work, because I have all the 83 offsets in the topic.



Thank you.







java apache-kafka






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Nov 20 '18 at 15:19









cricket_007

81.9k1143111




81.9k1143111










asked Nov 20 '18 at 9:44









Milan PanicMilan Panic

548




548













  • What did you try so far?

    – Giorgos Myrianthous
    Nov 20 '18 at 10:50











  • I found log.cleanup.policy=compact, but I don't know where to put it. Is it somewhere in the Kafka config ?

    – Milan Panic
    Nov 20 '18 at 10:53






  • 1





    You should place it under /etc/kafka/server.properties.

    – Giorgos Myrianthous
    Nov 20 '18 at 10:56











  • Does it matter where I place it in this file?

    – Milan Panic
    Nov 20 '18 at 11:00











  • No, it doesn't matter.

    – Giorgos Myrianthous
    Nov 20 '18 at 11:01



















  • What did you try so far?

    – Giorgos Myrianthous
    Nov 20 '18 at 10:50











  • I found log.cleanup.policy=compact, but I don't know where to put it. Is it somewhere in the Kafka config ?

    – Milan Panic
    Nov 20 '18 at 10:53






  • 1





    You should place it under /etc/kafka/server.properties.

    – Giorgos Myrianthous
    Nov 20 '18 at 10:56











  • Does it matter where I place it in this file?

    – Milan Panic
    Nov 20 '18 at 11:00











  • No, it doesn't matter.

    – Giorgos Myrianthous
    Nov 20 '18 at 11:01

















What did you try so far?

– Giorgos Myrianthous
Nov 20 '18 at 10:50





What did you try so far?

– Giorgos Myrianthous
Nov 20 '18 at 10:50













I found log.cleanup.policy=compact, but I don't know where to put it. Is it somewhere in the Kafka config ?

– Milan Panic
Nov 20 '18 at 10:53





I found log.cleanup.policy=compact, but I don't know where to put it. Is it somewhere in the Kafka config ?

– Milan Panic
Nov 20 '18 at 10:53




1




1





You should place it under /etc/kafka/server.properties.

– Giorgos Myrianthous
Nov 20 '18 at 10:56





You should place it under /etc/kafka/server.properties.

– Giorgos Myrianthous
Nov 20 '18 at 10:56













Does it matter where I place it in this file?

– Milan Panic
Nov 20 '18 at 11:00





Does it matter where I place it in this file?

– Milan Panic
Nov 20 '18 at 11:00













No, it doesn't matter.

– Giorgos Myrianthous
Nov 20 '18 at 11:01





No, it doesn't matter.

– Giorgos Myrianthous
Nov 20 '18 at 11:01












1 Answer
1






active

oldest

votes


















1














When you set log.cleanup.policy=compact in server.properties, it will be the default policy when creating new topics. If you change server.properties after creating your topic, your topic configuration won't change.



You can alter your topic configuration to set cleanup.policy=compact



As compaction is made by log cleaner, you may want to set specific delete.retention.ms on your topic as default retention is 24 hours.



Last, compaction doesn't occur on active segment.
see Kafka Log Compaction not starting






share|improve this answer
























  • I deleted all the kafka logs and started new with the cleanup.policy=compact in server.properties, created the topics and it is not working again, These are my settings: log.cleaner.enable=true min.cleanable.dirty.ratio=0.01 log.cleanup.policy=compact segment.ms=100 delete.retention.ms=100

    – Milan Panic
    Nov 21 '18 at 10:10













  • When you say it is not working, you mean not matter how long you wait, your consumer never consumes only two records corresponding to the two keys in the topic ?

    – Gery
    Nov 22 '18 at 10:55











Your Answer






StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");

StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});

function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});


}
});














draft saved

draft discarded


















StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53390170%2fhow-to-create-kafka-compacted-topic%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown

























1 Answer
1






active

oldest

votes








1 Answer
1






active

oldest

votes









active

oldest

votes






active

oldest

votes









1














When you set log.cleanup.policy=compact in server.properties, it will be the default policy when creating new topics. If you change server.properties after creating your topic, your topic configuration won't change.



You can alter your topic configuration to set cleanup.policy=compact



As compaction is made by log cleaner, you may want to set specific delete.retention.ms on your topic as default retention is 24 hours.



Last, compaction doesn't occur on active segment.
see Kafka Log Compaction not starting






share|improve this answer
























  • I deleted all the kafka logs and started new with the cleanup.policy=compact in server.properties, created the topics and it is not working again, These are my settings: log.cleaner.enable=true min.cleanable.dirty.ratio=0.01 log.cleanup.policy=compact segment.ms=100 delete.retention.ms=100

    – Milan Panic
    Nov 21 '18 at 10:10













  • When you say it is not working, you mean not matter how long you wait, your consumer never consumes only two records corresponding to the two keys in the topic ?

    – Gery
    Nov 22 '18 at 10:55
















1














When you set log.cleanup.policy=compact in server.properties, it will be the default policy when creating new topics. If you change server.properties after creating your topic, your topic configuration won't change.



You can alter your topic configuration to set cleanup.policy=compact



As compaction is made by log cleaner, you may want to set specific delete.retention.ms on your topic as default retention is 24 hours.



Last, compaction doesn't occur on active segment.
see Kafka Log Compaction not starting






share|improve this answer
























  • I deleted all the kafka logs and started new with the cleanup.policy=compact in server.properties, created the topics and it is not working again, These are my settings: log.cleaner.enable=true min.cleanable.dirty.ratio=0.01 log.cleanup.policy=compact segment.ms=100 delete.retention.ms=100

    – Milan Panic
    Nov 21 '18 at 10:10













  • When you say it is not working, you mean not matter how long you wait, your consumer never consumes only two records corresponding to the two keys in the topic ?

    – Gery
    Nov 22 '18 at 10:55














1












1








1







When you set log.cleanup.policy=compact in server.properties, it will be the default policy when creating new topics. If you change server.properties after creating your topic, your topic configuration won't change.



You can alter your topic configuration to set cleanup.policy=compact



As compaction is made by log cleaner, you may want to set specific delete.retention.ms on your topic as default retention is 24 hours.



Last, compaction doesn't occur on active segment.
see Kafka Log Compaction not starting






share|improve this answer













When you set log.cleanup.policy=compact in server.properties, it will be the default policy when creating new topics. If you change server.properties after creating your topic, your topic configuration won't change.



You can alter your topic configuration to set cleanup.policy=compact



As compaction is made by log cleaner, you may want to set specific delete.retention.ms on your topic as default retention is 24 hours.



Last, compaction doesn't occur on active segment.
see Kafka Log Compaction not starting







share|improve this answer












share|improve this answer



share|improve this answer










answered Nov 20 '18 at 17:51









GeryGery

31116




31116













  • I deleted all the kafka logs and started new with the cleanup.policy=compact in server.properties, created the topics and it is not working again, These are my settings: log.cleaner.enable=true min.cleanable.dirty.ratio=0.01 log.cleanup.policy=compact segment.ms=100 delete.retention.ms=100

    – Milan Panic
    Nov 21 '18 at 10:10













  • When you say it is not working, you mean not matter how long you wait, your consumer never consumes only two records corresponding to the two keys in the topic ?

    – Gery
    Nov 22 '18 at 10:55



















  • I deleted all the kafka logs and started new with the cleanup.policy=compact in server.properties, created the topics and it is not working again, These are my settings: log.cleaner.enable=true min.cleanable.dirty.ratio=0.01 log.cleanup.policy=compact segment.ms=100 delete.retention.ms=100

    – Milan Panic
    Nov 21 '18 at 10:10













  • When you say it is not working, you mean not matter how long you wait, your consumer never consumes only two records corresponding to the two keys in the topic ?

    – Gery
    Nov 22 '18 at 10:55

















I deleted all the kafka logs and started new with the cleanup.policy=compact in server.properties, created the topics and it is not working again, These are my settings: log.cleaner.enable=true min.cleanable.dirty.ratio=0.01 log.cleanup.policy=compact segment.ms=100 delete.retention.ms=100

– Milan Panic
Nov 21 '18 at 10:10







I deleted all the kafka logs and started new with the cleanup.policy=compact in server.properties, created the topics and it is not working again, These are my settings: log.cleaner.enable=true min.cleanable.dirty.ratio=0.01 log.cleanup.policy=compact segment.ms=100 delete.retention.ms=100

– Milan Panic
Nov 21 '18 at 10:10















When you say it is not working, you mean not matter how long you wait, your consumer never consumes only two records corresponding to the two keys in the topic ?

– Gery
Nov 22 '18 at 10:55





When you say it is not working, you mean not matter how long you wait, your consumer never consumes only two records corresponding to the two keys in the topic ?

– Gery
Nov 22 '18 at 10:55




















draft saved

draft discarded




















































Thanks for contributing an answer to Stack Overflow!


  • Please be sure to answer the question. Provide details and share your research!

But avoid



  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.


To learn more, see our tips on writing great answers.




draft saved


draft discarded














StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53390170%2fhow-to-create-kafka-compacted-topic%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







Popular posts from this blog

Biblatex bibliography style without URLs when DOI exists (in Overleaf with Zotero bibliography)

ComboBox Display Member on multiple fields

Is it possible to collect Nectar points via Trainline?