4 Replies Latest reply on Sep 17, 2015 4:00 AM by Taimoor

    Apache Kafka and Intel Galileo

    Taimoor

      Hey Guys!!

       

      I'm attempting to write a Simple Kafka client on the Intel Galileo Gen2 board. I have set up a simple server on the example IP given in the code. This code works well on my PC but attempting to run it on the Gen2 board runs into errors (see end of this post)

       

      from kafka import SimpleProducer, KafkaClient
      
      kafka = KafkaClient('172.168.0.111:9092')
      
      
      producer = SimpleProducer(kafka)
      
      # Note that the application is responsible for encoding messages to type bytes
      producer.send_messages(b'test', b'some message')
      producer.send_messages(b'test', b'this method', b'is variadic')
      

       

      I see the following errors:

      No handlers could be found for logger "kafka.conn"
      Traceback (most recent call last):
        File "testKafka.py", line 10, in <module>
          producer.send_messages(b'test', b'some message')
        File "/usr/lib/python2.7/site-packages/kafka/producer/simple.py", line 54, in send_messages
          topic, partition, *msg
        File "/usr/lib/python2.7/site-packages/kafka/producer/base.py", line 349, in send_messages
          return self._send_messages(topic, partition, *msg)
        File "/usr/lib/python2.7/site-packages/kafka/producer/base.py", line 390, in _send_messages
          fail_on_error=self.sync_fail_on_error
        File "/usr/lib/python2.7/site-packages/kafka/client.py", line 480, in send_produce_request
          (not fail_on_error or not self._raise_on_response_error(resp))]
        File "/usr/lib/python2.7/site-packages/kafka/client.py", line 247, in _raise_on_response_error
          raise resp
      kafka.common.FailedPayloadsError
      

       

      Do you have any idea how I can resolve this problem?

       

      Has anyone else run a Kafka producer/consumer (maybe even in C or node)?

       

      Thank you!

      Taimoor

        • 1. Re: Apache Kafka and Intel Galileo
          Taimoor

          So, an update: I have not managed to successfully send messages to a topic on via the Galileo board..

           

           

          BUT, I performed the exact same steps for the Intel Edison and I can send messages... What could be the reason? How can I debug?

           

          The python versions are the same as far as I can see... I did a pip install kafka-python in both cases

           

          root@galileo201:~# pip show python
          ---
          Metadata-Version: 1.1
          Name: Python
          Version: 2.7.3
          Summary: A high-level object-oriented programming language
          Home-page: http://www.python.org/2.7
          Author: Guido van Rossum and the Python community
          Author-email: python-dev@python.org
          License: PSF license
          Location: /usr/lib/python2.7/lib-dynload
          Requires:
          

           

           

          root@edison:~# pip show python
          ---
          Metadata-Version: 1.1
          Name: Python
          Version: 2.7.3
          Summary: A high-level object-oriented programming language
          Home-page: http://www.python.org/2.7
          Author: Guido van Rossum and the Python community
          Author-email: python-dev@python.org
          License: PSF license
          Location: /usr/lib/python2.7/lib-dynload
          Requires: 
          
          • 2. Re: Apache Kafka and Intel Galileo
            Taimoor

            I modified the python code to..

             

            from kafka import SimpleProducer, KafkaClient
            import logging
            
            logging.basicConfig(
                    format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
                    level=logging.DEBUG
                    )
            kafka = KafkaClient('172.168.0.111:9092')
            
            
            producer = SimpleProducer(kafka)
            
            # Note that the application is responsible for encoding messages to type bytes
            producer.send_messages(b'test', b'some message')
            producer.send_messages(b'test', b'this method', b'is variadic')
            

             

            and the error logs are...

             

            root@galileo201:~# python testKafka.py
            2015-09-15 11:33:07,724.724.637985229:kafka.client:-1216420096:DEBUG:235:Request 1: []
            2015-09-15 11:33:07,730.730.56602478:kafka.conn:-1216420096:DEBUG:235:Reinitializing socket connection for 172.168.0.111:9092
            2015-09-15 11:33:07,739.739.641904831:kafka.conn:-1216420096:DEBUG:235:About to send 30 bytes to Kafka, request 1
            2015-09-15 11:33:07,744.744.97795105:kafka.conn:-1216420096:DEBUG:235:Reading response 1 from Kafka
            2015-09-15 11:33:07,750.750.304937363:kafka.conn:-1216420096:DEBUG:235:About to read 4 bytes from Kafka
            2015-09-15 11:33:07,755.755.7721138:kafka.conn:-1216420096:DEBUG:235:Read 4/4 bytes from Kafka
            2015-09-15 11:33:07,761.761.053085327:kafka.conn:-1216420096:DEBUG:235:About to read 81 bytes from Kafka
            2015-09-15 11:33:07,766.766.630887985:kafka.conn:-1216420096:DEBUG:235:Read 81/81 bytes from Kafka
            2015-09-15 11:33:07,773.773.736000061:kafka.client:-1216420096:DEBUG:235:Response 1: MetadataResponse(brokers=[BrokerMetadata(nodeId=0, host='XXXX', port=9092)], topics=[TopicMetadata(topic='test', error=0, partitions=[PartitionMetadata(topic='test', partition=0, leader=0, replicas=(0,), isr=(0,), error=0)])])
            2015-09-15 11:33:07,780.780.160903931:kafka.client:-1216420096:INFO:235:Updating broker metadata: [BrokerMetadata(nodeId=0, host='XXXX', port=9092)]
            2015-09-15 11:33:07,785.785.665988922:kafka.client:-1216420096:INFO:235:Updating topic metadata: [TopicMetadata(topic='test', error=0, partitions=[PartitionMetadata(topic='test', partition=0, leader=0, replicas=(0,), isr=(0,), error=0)])]
            2015-09-15 11:33:07,798.798.206090927:kafka.client:-1216420096:DEBUG:235:Request 2 to BrokerMetadata(nodeId=0, host='XXXX', port=9092): [ProduceRequest(topic='test', partition=0, messages=[Message(magic=0, attributes=0, key=None, value='some message')])]
            2015-09-15 11:33:07,808.808.684110641:kafka.conn:-1216420096:DEBUG:235:Reinitializing socket connection for XXXX:9092
            2015-09-15 11:33:08,129.129.983901978:kafka.conn:-1216420096:ERROR:235:Unable to connect to kafka broker at XXXX:9092
            Traceback (most recent call last):
              File "/usr/lib/python2.7/site-packages/kafka/conn.py", line 211, in reinit
                self._sock = socket.create_connection((self.host, self.port), self.timeout)
              File "/usr/lib/python2.7/socket.py", line 553, in create_connection
                for res in getaddrinfo(host, port, 0, SOCK_STREAM):
            gaierror: [Errno -2] Name or service not known
            2015-09-15 11:33:08,159.159.049034119:kafka.client:-1216420096:WARNING:235:ConnectionError attempting to send request 2 to server BrokerMetadata(nodeId=0, host='XXXX', port=9092): Kafka @ XXXX:9092 went away
            2015-09-15 11:33:08,165.165.437936783:kafka.producer:-1216420096:ERROR:235:Unable to send messages
            Traceback (most recent call last):
              File "/usr/lib/python2.7/site-packages/kafka/producer/base.py", line 390, in _send_messages
                fail_on_error=self.sync_fail_on_error
              File "/usr/lib/python2.7/site-packages/kafka/client.py", line 480, in send_produce_request
                (not fail_on_error or not self._raise_on_response_error(resp))]
              File "/usr/lib/python2.7/site-packages/kafka/client.py", line 247, in _raise_on_response_error
                raise resp
            FailedPayloadsError
            Traceback (most recent call last):
              File "testKafka.py", line 14, in <module>
                producer.send_messages(b'test', b'some message')
              File "/usr/lib/python2.7/site-packages/kafka/producer/simple.py", line 54, in send_messages
                topic, partition, *msg
              File "/usr/lib/python2.7/site-packages/kafka/producer/base.py", line 349, in send_messages
                return self._send_messages(topic, partition, *msg)
              File "/usr/lib/python2.7/site-packages/kafka/producer/base.py", line 390, in _send_messages
                fail_on_error=self.sync_fail_on_error
              File "/usr/lib/python2.7/site-packages/kafka/client.py", line 480, in send_produce_request
                (not fail_on_error or not self._raise_on_response_error(resp))]
              File "/usr/lib/python2.7/site-packages/kafka/client.py", line 247, in _raise_on_response_error
                raise resp
            kafka.common.FailedPayloadsError
            
            
            • 3. Re: Apache Kafka and Intel Galileo
              Taimoor

              So, an update... I did a fresh setup of the OS on another Galileo board and it seems that Kafka messages can get sent... Here's an updated log:

               

              2015-09-16 15:40:22,265.265.372991562:kafka.client:-1216563456:DEBUG:2874:Request 1: []
              2015-09-16 15:40:22,271.271.461009979:kafka.conn:-1216563456:DEBUG:2874:Reinitializing socket connection for 172.168.0.111:9092
              2015-09-16 15:40:22,280.280.299901962:kafka.conn:-1216563456:DEBUG:2874:About to send 30 bytes to Kafka, request 1
              2015-09-16 15:40:22,285.285.419940948:kafka.conn:-1216563456:DEBUG:2874:Reading response 1 from Kafka
              2015-09-16 15:40:22,291.291.044950485:kafka.conn:-1216563456:DEBUG:2874:About to read 4 bytes from Kafka
              2015-09-16 15:40:22,296.296.293973923:kafka.conn:-1216563456:DEBUG:2874:Read 4/4 bytes from Kafka
              2015-09-16 15:40:22,301.301.502943039:kafka.conn:-1216563456:DEBUG:2874:About to read 81 bytes from Kafka
              2015-09-16 15:40:22,306.306.515932083:kafka.conn:-1216563456:DEBUG:2874:Read 81/81 bytes from Kafka
              2015-09-16 15:40:22,313.313.81893158:kafka.client:-1216563456:DEBUG:2874:Response 1: MetadataResponse(brokers=[BrokerMetadata(nodeId=0, host='intranav-ThinkPad-T61', port=9092)], topics=[TopicMetadata(topic='test', error=0, partitions=[PartitionMetadata(topic='test', partition=0, leader=0, replicas=(0,), isr=(0,), error=0)])])
              2015-09-16 15:40:22,319.319.814920425:kafka.client:-1216563456:INFO:2874:Updating broker metadata: [BrokerMetadata(nodeId=0, host='intranav-ThinkPad-T61', port=9092)]
              2015-09-16 15:40:22,324.324.934005737:kafka.client:-1216563456:INFO:2874:Updating topic metadata: [TopicMetadata(topic='test', error=0, partitions=[PartitionMetadata(topic='test', partition=0, leader=0, replicas=(0,), isr=(0,), error=0)])]
              2015-09-16 15:40:22,336.336.472988129:kafka.client:-1216563456:DEBUG:2874:Request 2 to BrokerMetadata(nodeId=0, host='intranav-ThinkPad-T61', port=9092): [ProduceRequest(topic='test', partition=0, messages=[Message(magic=0, attributes=0, key=None, value='This is me (Taimoor) here')])]
              2015-09-16 15:40:22,347.347.311973572:kafka.conn:-1216563456:DEBUG:2874:Reinitializing socket connection for intranav-ThinkPad-T61:9092
              2015-09-16 15:40:27,786.786.072015762:kafka.conn:-1216563456:DEBUG:2874:About to send 105 bytes to Kafka, request 2
              2015-09-16 15:40:27,791.791.611909866:kafka.conn:-1216563456:DEBUG:2874:Reading response 2 from Kafka
              2015-09-16 15:40:27,796.796.857118607:kafka.conn:-1216563456:DEBUG:2874:About to read 4 bytes from Kafka
              2015-09-16 15:40:27,802.802.403926849:kafka.conn:-1216563456:DEBUG:2874:Read 4/4 bytes from Kafka
              2015-09-16 15:40:27,807.807.549953461:kafka.conn:-1216563456:DEBUG:2874:About to read 32 bytes from Kafka
              2015-09-16 15:40:27,813.813.091039658:kafka.conn:-1216563456:DEBUG:2874:Read 32/32 bytes from Kafka
              2015-09-16 15:40:27,819.819.006919861:kafka.client:-1216563456:DEBUG:2874:Response 2: [ProduceResponse(topic='test', partition=0, error=0, offset=2142)]
              2015-09-16 15:40:27,826.826.735973358:kafka.client:-1216563456:DEBUG:2874:Request 3 to BrokerMetadata(nodeId=0, host='intranav-ThinkPad-T61', port=9092): [ProduceRequest(topic='test', partition=0, messages=[Message(magic=0, attributes=0, key=None, value='this method'), Message(magic=0, attributes=0, key=None, value='is variadic')])]
              2015-09-16 15:40:27,836.836.637020111:kafka.conn:-1216563456:DEBUG:2874:About to send 128 bytes to Kafka, request 3
              2015-09-16 15:40:27,841.841.866970062:kafka.conn:-1216563456:DEBUG:2874:Reading response 3 from Kafka
              2015-09-16 15:40:27,847.847.053050995:kafka.conn:-1216563456:DEBUG:2874:About to read 4 bytes from Kafka
              2015-09-16 15:40:27,852.852.936029434:kafka.conn:-1216563456:DEBUG:2874:Read 4/4 bytes from Kafka
              2015-09-16 15:40:27,857.857.991933823:kafka.conn:-1216563456:DEBUG:2874:About to read 32 bytes from Kafka
              2015-09-16 15:40:27,863.863.610982895:kafka.conn:-1216563456:DEBUG:2874:Read 32/32 bytes from Kafka
              2015-09-16 15:40:27,869.869.436979294:kafka.client:-1216563456:DEBUG:2874:Response 3: [ProduceResponse(topic='test', partition=0, error=0, offset=2143)]
              

               

              Can someone help me figure out the problem???

               

              Danke Schön!!

              • 4. Re: Apache Kafka and Intel Galileo
                Taimoor

                Figured it out...

                 

                So if you change the server configuration to bind to its ip, it'll fix everything. I'm not sure why I wasn't getting this error on my other computers / edisons and one of the galileos ... But this seems like a logical fix!!

                 

                Thanks!!