Saturday, December 14, 2019

KafkaTransport in WSO2 ESB

Kafka: 

Start with kafka_2.12-1.0.0:

1).\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties -like repository.

2).\bin\windows\kafka-server-start.bat .\config\server.properties ---Kafka_Server


start with windows:

3)kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
--- (Topic create(test is the topic name))

4)kafka-topics.bat --list --zookeeper localhost:2181 (list of Topics)

5)kafka-console-producer.bat --broker-list localhost:9092 --topic test (producer cmd)

6)kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning(comsume)

-----------------------------------------------------------------------------------------------------------------

D:\kafka_2.12-1.0.0>.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

D:\kafka_2.12-1.0.0>.\bin\windows\kafka-server-start.bat .\config\server.properties

D:\kafka_2.12-1.0.0\bin\windows>kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

D:\kafka_2.12-1.0.0\bin\windows>kafka-console-producer.bat --broker-list localhost:9092 --topic test

D:\kafka_2.12-1.0.0\bin\windows>kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic manohar --from-beginning


sample code:


<?xml version="1.0" encoding="UTF-8"?>
<proxy xmlns="http://ws.apache.org/ns/synapse"
       name="KafkaTransport"
       startOnLoad="true"
       statistics="disable"
       trace="disable"
       transports="http,https">
   <target faultSequence="faultHandlerSeq">
      <inSequence>
         <kafkaTransport.init>
            <bootstrapServers>localhost:9092</bootstrapServers>
            <keySerializerClass>org.apache.kafka.common.serialization.StringSerializer</keySerializerClass>
            <valueSerializerClass>org.apache.kafka.common.serialization.StringSerializer</valueSerializerClass>
            <maxPoolSize>50</maxPoolSize>
            <requestTimeout>10000</requestTimeout>
            <acks>all</acks>
            <timeout>8000</timeout>
            <metadataFetchTimeout>5000</metadataFetchTimeout>
         </kafkaTransport.init>
         <kafkaTransport.publishMessages>
            <topic>test</topic>
         </kafkaTransport.publishMessages>
         <payloadFactory media-type="json">
            <format>
                {"topic":"$1", "partition":"$2", "offset":"$3"}
          </format>
            <args>
               <arg evaluator="xml" expression="$ctx:topic"/>
               <arg evaluator="xml" expression="$ctx:partition"/>
               <arg evaluator="xml" expression="$ctx:offset"/>
            </args>
         </payloadFactory>
         <property name="messageType" scope="axis2" value="application/json"/>
         <respond/>
      </inSequence>
   </target>
   <description/>
</proxy>

--------------------------------------------------------------------------------------------------------------
<?xml version="1.0" encoding="UTF-8"?>
<proxy xmlns="http://ws.apache.org/ns/synapse"
       name="KafkaTransport"
       startOnLoad="true"
       statistics="disable"
       trace="disable"
       transports="http,https">
   <target faultSequence="faultHandlerSeq">
      <inSequence>
         <kafkaTransport.init>
            <bootstrapServers>localhost:9092</bootstrapServers>
            <keySerializerClass>org.apache.kafka.common.serialization.StringSerializer</keySerializerClass>
            <valueSerializerClass>org.apache.kafka.common.serialization.StringSerializer</valueSerializerClass>
            <maxPoolSize>50</maxPoolSize>
            <requestTimeout>10000</requestTimeout>
            <acks>all</acks>
            <timeout>8000</timeout>
            <metadataFetchTimeout>5000</metadataFetchTimeout>
         </kafkaTransport.init>
       
         <payloadFactory media-type="json">
           <payloadFactory media-type="xml">
          <format>
  <soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:dat="http://ws.wso2.org/dataservice">
           <soapenv:Header/>
           <soapenv:Body>
           <dat:select>
            <dat:Id>123</dat:Id>
           </dat:select>
        </soapenv:Body>
       </soapenv:Envelope>
          </format>
            <args/>
        </payloadFactory>
         <property name="messageType" scope="axis2" type="STRING" value="text/xml"/>
         <property name="ContentType" scope="axis2" type="STRING" value="text/xml"/>
        <call>
                <endpoint>
                    <http method="post" uri-template="local:///services/select/"/>
                </endpoint>
            </call>
           <kafkaTransport.publishMessages>
            <topic>test</topic>
         </kafkaTransport.publishMessages>
            <log level="full"/>
            <respond/>
      </inSequence>
   </target>
   <description/>
</proxy>
                               
                                

No comments:

Post a Comment