Kafka:
Start with kafka_2.12-1.0.0:
1).\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties -like repository.
2).\bin\windows\kafka-server-start.bat .\config\server.properties ---Kafka_Server
start with windows:
3)kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
--- (Topic create(test is the topic name))
4)kafka-topics.bat --list --zookeeper localhost:2181 (list of Topics)
5)kafka-console-producer.bat --broker-list localhost:9092 --topic test (producer cmd)
6)kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning(comsume)
-----------------------------------------------------------------------------------------------------------------
D:\kafka_2.12-1.0.0>.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
D:\kafka_2.12-1.0.0>.\bin\windows\kafka-server-start.bat .\config\server.properties
D:\kafka_2.12-1.0.0\bin\windows>kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
D:\kafka_2.12-1.0.0\bin\windows>kafka-console-producer.bat --broker-list localhost:9092 --topic test
D:\kafka_2.12-1.0.0\bin\windows>kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic manohar --from-beginning
sample code:
<?xml version="1.0" encoding="UTF-8"?>
<proxy xmlns="http://ws.apache.org/ns/synapse"
name="KafkaTransport"
startOnLoad="true"
statistics="disable"
trace="disable"
transports="http,https">
<target faultSequence="faultHandlerSeq">
<inSequence>
<kafkaTransport.init>
<bootstrapServers>localhost:9092</bootstrapServers>
<keySerializerClass>org.apache.kafka.common.serialization.StringSerializer</keySerializerClass>
<valueSerializerClass>org.apache.kafka.common.serialization.StringSerializer</valueSerializerClass>
<maxPoolSize>50</maxPoolSize>
<requestTimeout>10000</requestTimeout>
<acks>all</acks>
<timeout>8000</timeout>
<metadataFetchTimeout>5000</metadataFetchTimeout>
</kafkaTransport.init>
<kafkaTransport.publishMessages>
<topic>test</topic>
</kafkaTransport.publishMessages>
<payloadFactory media-type="json">
<format>
{"topic":"$1", "partition":"$2", "offset":"$3"}
</format>
<args>
<arg evaluator="xml" expression="$ctx:topic"/>
<arg evaluator="xml" expression="$ctx:partition"/>
<arg evaluator="xml" expression="$ctx:offset"/>
</args>
</payloadFactory>
<property name="messageType" scope="axis2" value="application/json"/>
<respond/>
</inSequence>
</target>
<description/>
</proxy>
--------------------------------------------------------------------------------------------------------------
<?xml version="1.0" encoding="UTF-8"?>
<proxy xmlns="http://ws.apache.org/ns/synapse"
name="KafkaTransport"
startOnLoad="true"
statistics="disable"
trace="disable"
transports="http,https">
<target faultSequence="faultHandlerSeq">
<inSequence>
<kafkaTransport.init>
<bootstrapServers>localhost:9092</bootstrapServers>
<keySerializerClass>org.apache.kafka.common.serialization.StringSerializer</keySerializerClass>
<valueSerializerClass>org.apache.kafka.common.serialization.StringSerializer</valueSerializerClass>
<maxPoolSize>50</maxPoolSize>
<requestTimeout>10000</requestTimeout>
<acks>all</acks>
<timeout>8000</timeout>
<metadataFetchTimeout>5000</metadataFetchTimeout>
</kafkaTransport.init>
<payloadFactory media-type="json">
<payloadFactory media-type="xml">
<format>
<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:dat="http://ws.wso2.org/dataservice">
<soapenv:Header/>
<soapenv:Body>
<dat:select>
<dat:Id>123</dat:Id>
</dat:select>
</soapenv:Body>
</soapenv:Envelope>
</format>
<args/>
</payloadFactory>
<property name="messageType" scope="axis2" type="STRING" value="text/xml"/>
<property name="ContentType" scope="axis2" type="STRING" value="text/xml"/>
<call>
<endpoint>
<http method="post" uri-template="local:///services/select/"/>
</endpoint>
</call>
<kafkaTransport.publishMessages>
<topic>test</topic>
</kafkaTransport.publishMessages>
<log level="full"/>
<respond/>
</inSequence>
</target>
<description/>
</proxy>
Start with kafka_2.12-1.0.0:
1).\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties -like repository.
2).\bin\windows\kafka-server-start.bat .\config\server.properties ---Kafka_Server
start with windows:
3)kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
--- (Topic create(test is the topic name))
4)kafka-topics.bat --list --zookeeper localhost:2181 (list of Topics)
5)kafka-console-producer.bat --broker-list localhost:9092 --topic test (producer cmd)
6)kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning(comsume)
-----------------------------------------------------------------------------------------------------------------
D:\kafka_2.12-1.0.0>.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
D:\kafka_2.12-1.0.0>.\bin\windows\kafka-server-start.bat .\config\server.properties
D:\kafka_2.12-1.0.0\bin\windows>kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
D:\kafka_2.12-1.0.0\bin\windows>kafka-console-producer.bat --broker-list localhost:9092 --topic test
D:\kafka_2.12-1.0.0\bin\windows>kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic manohar --from-beginning
sample code:
<?xml version="1.0" encoding="UTF-8"?>
<proxy xmlns="http://ws.apache.org/ns/synapse"
name="KafkaTransport"
startOnLoad="true"
statistics="disable"
trace="disable"
transports="http,https">
<target faultSequence="faultHandlerSeq">
<inSequence>
<kafkaTransport.init>
<bootstrapServers>localhost:9092</bootstrapServers>
<keySerializerClass>org.apache.kafka.common.serialization.StringSerializer</keySerializerClass>
<valueSerializerClass>org.apache.kafka.common.serialization.StringSerializer</valueSerializerClass>
<maxPoolSize>50</maxPoolSize>
<requestTimeout>10000</requestTimeout>
<acks>all</acks>
<timeout>8000</timeout>
<metadataFetchTimeout>5000</metadataFetchTimeout>
</kafkaTransport.init>
<kafkaTransport.publishMessages>
<topic>test</topic>
</kafkaTransport.publishMessages>
<payloadFactory media-type="json">
<format>
{"topic":"$1", "partition":"$2", "offset":"$3"}
</format>
<args>
<arg evaluator="xml" expression="$ctx:topic"/>
<arg evaluator="xml" expression="$ctx:partition"/>
<arg evaluator="xml" expression="$ctx:offset"/>
</args>
</payloadFactory>
<property name="messageType" scope="axis2" value="application/json"/>
<respond/>
</inSequence>
</target>
<description/>
</proxy>
--------------------------------------------------------------------------------------------------------------
<?xml version="1.0" encoding="UTF-8"?>
<proxy xmlns="http://ws.apache.org/ns/synapse"
name="KafkaTransport"
startOnLoad="true"
statistics="disable"
trace="disable"
transports="http,https">
<target faultSequence="faultHandlerSeq">
<inSequence>
<kafkaTransport.init>
<bootstrapServers>localhost:9092</bootstrapServers>
<keySerializerClass>org.apache.kafka.common.serialization.StringSerializer</keySerializerClass>
<valueSerializerClass>org.apache.kafka.common.serialization.StringSerializer</valueSerializerClass>
<maxPoolSize>50</maxPoolSize>
<requestTimeout>10000</requestTimeout>
<acks>all</acks>
<timeout>8000</timeout>
<metadataFetchTimeout>5000</metadataFetchTimeout>
</kafkaTransport.init>
<payloadFactory media-type="json">
<payloadFactory media-type="xml">
<format>
<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:dat="http://ws.wso2.org/dataservice">
<soapenv:Header/>
<soapenv:Body>
<dat:select>
<dat:Id>123</dat:Id>
</dat:select>
</soapenv:Body>
</soapenv:Envelope>
</format>
<args/>
</payloadFactory>
<property name="messageType" scope="axis2" type="STRING" value="text/xml"/>
<property name="ContentType" scope="axis2" type="STRING" value="text/xml"/>
<call>
<endpoint>
<http method="post" uri-template="local:///services/select/"/>
</endpoint>
</call>
<kafkaTransport.publishMessages>
<topic>test</topic>
</kafkaTransport.publishMessages>
<log level="full"/>
<respond/>
</inSequence>
</target>
<description/>
</proxy>
No comments:
Post a Comment