Kafka Producer does not write to kafka topic












0















sorry for the noob question: I am writing to kafka using akka but I cannot see it in the kafka console consumer.



Config for writing to kafka:



kafka {
bootstrap.servers = "localhost:9002"
auto.offset.reset = "earliest"
}


I have code to write to a kafka topic using akka:



class ServiceKafkaProducer(topicName: String, actorSystem: ActorSystem, configuration: Configuration) {
val bootstrapServers: String = configuration
.getString("kafka.bootstrap.servers")
.getOrElse(
throw new Exception("No config element foe kafka.bootstrap.servers")
)

val producerSettings: ProducerSettings[String, String] = ProducerSettings(
actorSystem,
new StringSerializer,
new StringSerializer
).withBootstrapServers(bootstrapServers)

val producer: KafkaProducer[String, String] = producerSettings.createKafkaProducer()

def send(logRecordStr: String): Unit = {
Logger.debug(s"Inside ServiceKafkaProducer, writing to $topicName")
Logger.debug(logRecordStr)
producer.send(
new ProducerRecord(topicName, logRecordStr)
)
}
}

def createTag(text: String, createdBy: UUID): Unit = {
Logger.debug("Inside TagEventProducer#createTag")
val tagId = UUID.randomUUID()

val event = TagCreated(tagId, text, createdBy)
println(event)
val record = createLogRecord(event)

send(record.encode)
}


LOGS



```[debug] - application - Inside TagEventProducer#createTag
TagCreated(d393d223-9eb6-45e3-8610-56a3f65c84cc,scala,f5b61ca0-0ccc-4064-94c1-cba2a5a4087b)
[debug] - application - Inside ServiceKafkaProducer, writing to tags
[debug] - application - {"id":"ed27f0d1-6b6c-469b-af97-1929dc6a5cc7","action":"tag-created","data":{"id":"d393d223-9eb6-45e3-8610-56a3f65c84cc","text":"scala","createdBy":"f5b61ca0-0ccc-4064-94c1-cba2a5a4087b"},"timestamp":1542776716868}```


(edited)
I am running kafka using the spotify docker image like this:



version: '3.5'
services:
kafka:
image: 'spotify/kafka'
hostname: kafka
environment:
- ADVERTISED_HOST=kafka
- ADVERTISED_PORT=9092
ports:
- "9092:9092"
- "2181:2181"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
networks:
- kafka_net
kafkaManager:
image: 'sheepkiller/kafka-manager'
environment:
- ZK_HOSTS=kafka:2181
- APPLICATION_SECRET=letmein
ports:
- "8000:8000"
networks:
- kafka_net
networks:
kafka_net:
name: my_network









share|improve this question





























    0















    sorry for the noob question: I am writing to kafka using akka but I cannot see it in the kafka console consumer.



    Config for writing to kafka:



    kafka {
    bootstrap.servers = "localhost:9002"
    auto.offset.reset = "earliest"
    }


    I have code to write to a kafka topic using akka:



    class ServiceKafkaProducer(topicName: String, actorSystem: ActorSystem, configuration: Configuration) {
    val bootstrapServers: String = configuration
    .getString("kafka.bootstrap.servers")
    .getOrElse(
    throw new Exception("No config element foe kafka.bootstrap.servers")
    )

    val producerSettings: ProducerSettings[String, String] = ProducerSettings(
    actorSystem,
    new StringSerializer,
    new StringSerializer
    ).withBootstrapServers(bootstrapServers)

    val producer: KafkaProducer[String, String] = producerSettings.createKafkaProducer()

    def send(logRecordStr: String): Unit = {
    Logger.debug(s"Inside ServiceKafkaProducer, writing to $topicName")
    Logger.debug(logRecordStr)
    producer.send(
    new ProducerRecord(topicName, logRecordStr)
    )
    }
    }

    def createTag(text: String, createdBy: UUID): Unit = {
    Logger.debug("Inside TagEventProducer#createTag")
    val tagId = UUID.randomUUID()

    val event = TagCreated(tagId, text, createdBy)
    println(event)
    val record = createLogRecord(event)

    send(record.encode)
    }


    LOGS



    ```[debug] - application - Inside TagEventProducer#createTag
    TagCreated(d393d223-9eb6-45e3-8610-56a3f65c84cc,scala,f5b61ca0-0ccc-4064-94c1-cba2a5a4087b)
    [debug] - application - Inside ServiceKafkaProducer, writing to tags
    [debug] - application - {"id":"ed27f0d1-6b6c-469b-af97-1929dc6a5cc7","action":"tag-created","data":{"id":"d393d223-9eb6-45e3-8610-56a3f65c84cc","text":"scala","createdBy":"f5b61ca0-0ccc-4064-94c1-cba2a5a4087b"},"timestamp":1542776716868}```


    (edited)
    I am running kafka using the spotify docker image like this:



    version: '3.5'
    services:
    kafka:
    image: 'spotify/kafka'
    hostname: kafka
    environment:
    - ADVERTISED_HOST=kafka
    - ADVERTISED_PORT=9092
    ports:
    - "9092:9092"
    - "2181:2181"
    volumes:
    - /var/run/docker.sock:/var/run/docker.sock
    networks:
    - kafka_net
    kafkaManager:
    image: 'sheepkiller/kafka-manager'
    environment:
    - ZK_HOSTS=kafka:2181
    - APPLICATION_SECRET=letmein
    ports:
    - "8000:8000"
    networks:
    - kafka_net
    networks:
    kafka_net:
    name: my_network









    share|improve this question



























      0












      0








      0








      sorry for the noob question: I am writing to kafka using akka but I cannot see it in the kafka console consumer.



      Config for writing to kafka:



      kafka {
      bootstrap.servers = "localhost:9002"
      auto.offset.reset = "earliest"
      }


      I have code to write to a kafka topic using akka:



      class ServiceKafkaProducer(topicName: String, actorSystem: ActorSystem, configuration: Configuration) {
      val bootstrapServers: String = configuration
      .getString("kafka.bootstrap.servers")
      .getOrElse(
      throw new Exception("No config element foe kafka.bootstrap.servers")
      )

      val producerSettings: ProducerSettings[String, String] = ProducerSettings(
      actorSystem,
      new StringSerializer,
      new StringSerializer
      ).withBootstrapServers(bootstrapServers)

      val producer: KafkaProducer[String, String] = producerSettings.createKafkaProducer()

      def send(logRecordStr: String): Unit = {
      Logger.debug(s"Inside ServiceKafkaProducer, writing to $topicName")
      Logger.debug(logRecordStr)
      producer.send(
      new ProducerRecord(topicName, logRecordStr)
      )
      }
      }

      def createTag(text: String, createdBy: UUID): Unit = {
      Logger.debug("Inside TagEventProducer#createTag")
      val tagId = UUID.randomUUID()

      val event = TagCreated(tagId, text, createdBy)
      println(event)
      val record = createLogRecord(event)

      send(record.encode)
      }


      LOGS



      ```[debug] - application - Inside TagEventProducer#createTag
      TagCreated(d393d223-9eb6-45e3-8610-56a3f65c84cc,scala,f5b61ca0-0ccc-4064-94c1-cba2a5a4087b)
      [debug] - application - Inside ServiceKafkaProducer, writing to tags
      [debug] - application - {"id":"ed27f0d1-6b6c-469b-af97-1929dc6a5cc7","action":"tag-created","data":{"id":"d393d223-9eb6-45e3-8610-56a3f65c84cc","text":"scala","createdBy":"f5b61ca0-0ccc-4064-94c1-cba2a5a4087b"},"timestamp":1542776716868}```


      (edited)
      I am running kafka using the spotify docker image like this:



      version: '3.5'
      services:
      kafka:
      image: 'spotify/kafka'
      hostname: kafka
      environment:
      - ADVERTISED_HOST=kafka
      - ADVERTISED_PORT=9092
      ports:
      - "9092:9092"
      - "2181:2181"
      volumes:
      - /var/run/docker.sock:/var/run/docker.sock
      networks:
      - kafka_net
      kafkaManager:
      image: 'sheepkiller/kafka-manager'
      environment:
      - ZK_HOSTS=kafka:2181
      - APPLICATION_SECRET=letmein
      ports:
      - "8000:8000"
      networks:
      - kafka_net
      networks:
      kafka_net:
      name: my_network









      share|improve this question
















      sorry for the noob question: I am writing to kafka using akka but I cannot see it in the kafka console consumer.



      Config for writing to kafka:



      kafka {
      bootstrap.servers = "localhost:9002"
      auto.offset.reset = "earliest"
      }


      I have code to write to a kafka topic using akka:



      class ServiceKafkaProducer(topicName: String, actorSystem: ActorSystem, configuration: Configuration) {
      val bootstrapServers: String = configuration
      .getString("kafka.bootstrap.servers")
      .getOrElse(
      throw new Exception("No config element foe kafka.bootstrap.servers")
      )

      val producerSettings: ProducerSettings[String, String] = ProducerSettings(
      actorSystem,
      new StringSerializer,
      new StringSerializer
      ).withBootstrapServers(bootstrapServers)

      val producer: KafkaProducer[String, String] = producerSettings.createKafkaProducer()

      def send(logRecordStr: String): Unit = {
      Logger.debug(s"Inside ServiceKafkaProducer, writing to $topicName")
      Logger.debug(logRecordStr)
      producer.send(
      new ProducerRecord(topicName, logRecordStr)
      )
      }
      }

      def createTag(text: String, createdBy: UUID): Unit = {
      Logger.debug("Inside TagEventProducer#createTag")
      val tagId = UUID.randomUUID()

      val event = TagCreated(tagId, text, createdBy)
      println(event)
      val record = createLogRecord(event)

      send(record.encode)
      }


      LOGS



      ```[debug] - application - Inside TagEventProducer#createTag
      TagCreated(d393d223-9eb6-45e3-8610-56a3f65c84cc,scala,f5b61ca0-0ccc-4064-94c1-cba2a5a4087b)
      [debug] - application - Inside ServiceKafkaProducer, writing to tags
      [debug] - application - {"id":"ed27f0d1-6b6c-469b-af97-1929dc6a5cc7","action":"tag-created","data":{"id":"d393d223-9eb6-45e3-8610-56a3f65c84cc","text":"scala","createdBy":"f5b61ca0-0ccc-4064-94c1-cba2a5a4087b"},"timestamp":1542776716868}```


      (edited)
      I am running kafka using the spotify docker image like this:



      version: '3.5'
      services:
      kafka:
      image: 'spotify/kafka'
      hostname: kafka
      environment:
      - ADVERTISED_HOST=kafka
      - ADVERTISED_PORT=9092
      ports:
      - "9092:9092"
      - "2181:2181"
      volumes:
      - /var/run/docker.sock:/var/run/docker.sock
      networks:
      - kafka_net
      kafkaManager:
      image: 'sheepkiller/kafka-manager'
      environment:
      - ZK_HOSTS=kafka:2181
      - APPLICATION_SECRET=letmein
      ports:
      - "8000:8000"
      networks:
      - kafka_net
      networks:
      kafka_net:
      name: my_network






      scala apache-kafka akka






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Nov 21 '18 at 8:23









      cchantep

      6,43532034




      6,43532034










      asked Nov 21 '18 at 5:32









      vamsiampoluvamsiampolu

      2,6421046119




      2,6421046119
























          3 Answers
          3






          active

          oldest

          votes


















          2














          First of all, you're using wrong port (9002 instead of 9092) in the producer. Try using bootstrap.servers = kafka:9092 instead of localhost:9002 in the producer, because your advertised host is set as kafka






          share|improve this answer































            0














            Set kafka.bootstrap.servers to {kafka_docker_host_api}:9092. And if your problem won't fix, try to use producer.flush(). Kafka producer doesn't send messages immediately. So if you want to force message sending, you need to flush.






            share|improve this answer

































              0














              Are you sure that your config for writing to Kafka is as follows?



              bootstrap.servers = "localhost:9002"
              auto.offset.reset = "earliest"


              I think it should be localhost:9092. You might have committed a typo here.






              share|improve this answer

























                Your Answer






                StackExchange.ifUsing("editor", function () {
                StackExchange.using("externalEditor", function () {
                StackExchange.using("snippets", function () {
                StackExchange.snippets.init();
                });
                });
                }, "code-snippets");

                StackExchange.ready(function() {
                var channelOptions = {
                tags: "".split(" "),
                id: "1"
                };
                initTagRenderer("".split(" "), "".split(" "), channelOptions);

                StackExchange.using("externalEditor", function() {
                // Have to fire editor after snippets, if snippets enabled
                if (StackExchange.settings.snippets.snippetsEnabled) {
                StackExchange.using("snippets", function() {
                createEditor();
                });
                }
                else {
                createEditor();
                }
                });

                function createEditor() {
                StackExchange.prepareEditor({
                heartbeatType: 'answer',
                autoActivateHeartbeat: false,
                convertImagesToLinks: true,
                noModals: true,
                showLowRepImageUploadWarning: true,
                reputationToPostImages: 10,
                bindNavPrevention: true,
                postfix: "",
                imageUploader: {
                brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
                contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
                allowUrls: true
                },
                onDemand: true,
                discardSelector: ".discard-answer"
                ,immediatelyShowMarkdownHelp:true
                });


                }
                });














                draft saved

                draft discarded


















                StackExchange.ready(
                function () {
                StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53405782%2fkafka-producer-does-not-write-to-kafka-topic%23new-answer', 'question_page');
                }
                );

                Post as a guest















                Required, but never shown

























                3 Answers
                3






                active

                oldest

                votes








                3 Answers
                3






                active

                oldest

                votes









                active

                oldest

                votes






                active

                oldest

                votes









                2














                First of all, you're using wrong port (9002 instead of 9092) in the producer. Try using bootstrap.servers = kafka:9092 instead of localhost:9002 in the producer, because your advertised host is set as kafka






                share|improve this answer




























                  2














                  First of all, you're using wrong port (9002 instead of 9092) in the producer. Try using bootstrap.servers = kafka:9092 instead of localhost:9002 in the producer, because your advertised host is set as kafka






                  share|improve this answer


























                    2












                    2








                    2







                    First of all, you're using wrong port (9002 instead of 9092) in the producer. Try using bootstrap.servers = kafka:9092 instead of localhost:9002 in the producer, because your advertised host is set as kafka






                    share|improve this answer













                    First of all, you're using wrong port (9002 instead of 9092) in the producer. Try using bootstrap.servers = kafka:9092 instead of localhost:9002 in the producer, because your advertised host is set as kafka







                    share|improve this answer












                    share|improve this answer



                    share|improve this answer










                    answered Nov 21 '18 at 5:36









                    BitswazskyBitswazsky

                    6721819




                    6721819

























                        0














                        Set kafka.bootstrap.servers to {kafka_docker_host_api}:9092. And if your problem won't fix, try to use producer.flush(). Kafka producer doesn't send messages immediately. So if you want to force message sending, you need to flush.






                        share|improve this answer






























                          0














                          Set kafka.bootstrap.servers to {kafka_docker_host_api}:9092. And if your problem won't fix, try to use producer.flush(). Kafka producer doesn't send messages immediately. So if you want to force message sending, you need to flush.






                          share|improve this answer




























                            0












                            0








                            0







                            Set kafka.bootstrap.servers to {kafka_docker_host_api}:9092. And if your problem won't fix, try to use producer.flush(). Kafka producer doesn't send messages immediately. So if you want to force message sending, you need to flush.






                            share|improve this answer















                            Set kafka.bootstrap.servers to {kafka_docker_host_api}:9092. And if your problem won't fix, try to use producer.flush(). Kafka producer doesn't send messages immediately. So if you want to force message sending, you need to flush.







                            share|improve this answer














                            share|improve this answer



                            share|improve this answer








                            edited Nov 21 '18 at 12:32

























                            answered Nov 21 '18 at 8:51









                            Aleksey IsachenkovAleksey Isachenkov

                            678114




                            678114























                                0














                                Are you sure that your config for writing to Kafka is as follows?



                                bootstrap.servers = "localhost:9002"
                                auto.offset.reset = "earliest"


                                I think it should be localhost:9092. You might have committed a typo here.






                                share|improve this answer






























                                  0














                                  Are you sure that your config for writing to Kafka is as follows?



                                  bootstrap.servers = "localhost:9002"
                                  auto.offset.reset = "earliest"


                                  I think it should be localhost:9092. You might have committed a typo here.






                                  share|improve this answer




























                                    0












                                    0








                                    0







                                    Are you sure that your config for writing to Kafka is as follows?



                                    bootstrap.servers = "localhost:9002"
                                    auto.offset.reset = "earliest"


                                    I think it should be localhost:9092. You might have committed a typo here.






                                    share|improve this answer















                                    Are you sure that your config for writing to Kafka is as follows?



                                    bootstrap.servers = "localhost:9002"
                                    auto.offset.reset = "earliest"


                                    I think it should be localhost:9092. You might have committed a typo here.







                                    share|improve this answer














                                    share|improve this answer



                                    share|improve this answer








                                    edited Nov 23 '18 at 10:50









                                    lloiacono

                                    2,10322029




                                    2,10322029










                                    answered Nov 21 '18 at 5:36









                                    Rishikesh GawadeRishikesh Gawade

                                    33




                                    33






























                                        draft saved

                                        draft discarded




















































                                        Thanks for contributing an answer to Stack Overflow!


                                        • Please be sure to answer the question. Provide details and share your research!

                                        But avoid



                                        • Asking for help, clarification, or responding to other answers.

                                        • Making statements based on opinion; back them up with references or personal experience.


                                        To learn more, see our tips on writing great answers.




                                        draft saved


                                        draft discarded














                                        StackExchange.ready(
                                        function () {
                                        StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53405782%2fkafka-producer-does-not-write-to-kafka-topic%23new-answer', 'question_page');
                                        }
                                        );

                                        Post as a guest















                                        Required, but never shown





















































                                        Required, but never shown














                                        Required, but never shown












                                        Required, but never shown







                                        Required, but never shown

































                                        Required, but never shown














                                        Required, but never shown












                                        Required, but never shown







                                        Required, but never shown







                                        Popular posts from this blog

                                        Biblatex bibliography style without URLs when DOI exists (in Overleaf with Zotero bibliography)

                                        ComboBox Display Member on multiple fields

                                        Is it possible to collect Nectar points via Trainline?