Storm SQL Integration

Storm SQL integration allows users to run SQL queries on Storm streaming data. In streaming analysis, the SQL interface will not only speed up the development cycle, but also open up opportunities for unified batch processing Apache Hive and real-time streaming data processing.

StormSQL will compile high-level SQL queries into Trident topologies and allow them on the Storm cluster. This article will show users how to use StormSQL. If anyone is interested in the details of StormSQL design and implementation, please refer to here

use

allow storm sql command to compile SQL statements It is the Trident topology and submitted to the Storm cluster.

 

< br />

1


 




$ bin/storm sql


Here sql-file contains the SQL statement that needs to be executed , topo-name is the name of the submitted topology.

Supported functions

In the current version library (1.0.1), the following functions are supported:

  • Stream processing to read and write external data sources
  • Filter tuples
  • Projections

specify External data source

StormSQL data is expressed in the form of an external table. Users can use the CREATE EXTERNAL TABLE statement to specify the data source. The syntax of CREATE EXTERNAL TABLE strictly follows the definition in Hive Data Definition Language.

 

< br />

1


2
3
4
5
6
7
8
 




CREATE EXTERNAL TABLE table_name field_list








[STORED AS








INPUTFORMAT input_format_classname








OUTPUTFORMAT output_format_classname








]








LOCATION location








[TBLPROPERTIES tbl_properties ]








[AS select_stmt ]


You can find a detailed explanation of each attribute in Hive Data Definition Language. For example: the following statement specifies a Kafka spout and sink:

 
< br />



1


 




CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY) LOCATION'kafka ://localhost:2181/brokers?topic=test' TBLPROPERTIES'{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org. apache.org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.org.apache.storm.kafka.ByteBufferSerializer"}}'


< /div>

Docking with external data sources

User Docking with external data sources needs to implement the ISqlTridentDataSource interface and register them with Java’s service loading mechanism. The external data source will be selected based on the Scheme of the URI in the table. Please refer to storm-sql-kafka for more implementation details.

Example: filter Kafka data stream

Suppose there is a Kafka data stream storing transaction order data. Each message in the stream contains the id of the order, the unit price of the product, and the quantity of the order. Our purpose is to filter out orders with a large transaction volume and insert these orders into another Kafka data stream for further analysis.

Users can specify the following SQL statements in the SQL file:

< pre>

1

2
3
 




CREATE EXTERNAL TABLE ORDERS (ID INT PRIMARY KEY, UNIT_PRICE INT, QUANTITY INT) LOCATION'kafka://localhost:2181/brokers?topic= orders' TBLPROPERTIES'{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.org.apache.storm.kafka.IntSerializer ","value.serializer":"org.apache.org.apache.storm.kafka.ByteBufferSerializer"}}'







CREATE EXTERNAL TABLE LARGE_ORDERS (ID INT PRIMARY KEY, TOTAL INT) LOCATION'kafka://localhost:2181/brokers?topic=large_orders' TBLPROPERTIES'{"producer":{"bootstrap .servers":"localhost:9092","acks":"1","key.serializer":"org.apache.org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache .org.apache.storm.kafka.ByteBufferSerializer"}}'








INSERT INTO LARGE_ORDERS SELECT ID, UNIT_PRICE * QUANTITY AS TOTAL FROM ORDERS WHERE UNIT_PRICE * QUANTITY> 50


The ORDER table defined in the first statement represents the input stream. The LOCATION field specifies the ZK address (localhost:2181), the path of brokers in Zookeeper (/brokers), and topic (orders ). The TBLPROPERTIES field specifies the configuration items of KafkaProducer.
Current implementation of storm-sql-kafka even if the table is read-only or write-only, you need to specify the LOCATION and TBLPROPERTIES items .

The LARGE_ORDERS table defined by the similar second statement represents the output stream. The third SELECT statement defines the topology: it makes StormSQL filter all orders in the external table ORDERS Price and insert the satisfied record into the specified LARGE_ORDER Kafka stream.

To run this example, the user needs to include the data source (storm-sql-kafka in this example) and its dependencies in the classpath. One way is to put the required jars in the extlib directory:

 




1


2
3
4
5
6
 




$ cp curator-client-2.5.0.jar curator-framework-2.5.0.jar zookeeper-3.4.6.jar







extlib/







< br /> $ cp scala-library-2.10.4.jar kafka-clients-0.8.2.1.jar kafka_2.10-0.8.2.1.jar metrics-core-2.2.0.jar extlib/








$ cp json-simple-1.1.1 .jar extlib/








$ cp jackson-annotations-2.6.0.jar extlib/








$ cp storm-kafka-*.jar storm-sql-kafka-*.jar storm-sql-runtime-*.jar extlib/


Next, submit the SQL statement to StormSQL:

 




1


 




$ bin/storm sql order_filtering order_filtering.sql


Now you should be able to see the order_filtering topology in the Storm UI.

Current Defects

Aggregation, windowing and joining have not yet been implemented; the parallelism of the specified topology is not currently supported; all The parallelism of processing tasks is 1.

Users also need to provide external data source dependencies in the extlib directory, otherwise the topology will fail to run because of ClassNotFoundException.

The current Kafka implementation connector in StormSQL assumes that the input and output data are both in JSON format. The connector does not yet support INPUTFORMAT and OUTPUTFORMAT.

Original text: Big column Storm SQL integration

1

$ bin/storm sql

p>

1

2

3

4

5

6

7

8

CREATE EXTERNAL TABLE table_name field_list

[ STORED AS

INPUTFORMAT input_format_classname

OUTPUTFORMAT output_format_classname

]

LOCATION location

[ TBLPROPERTIES tbl_properties ]

[ AS select_stmt ]

1< /p>

CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY) LOCATION’kafka://localhost:2181/brokers?topic=test’ TBLPROPERTIES'{“producer”:{“bootstrap.servers”:”localhost:9092 “,”acks”:”1″,”key.serializer”:”org.apache.org.apache.storm.kafka.IntSerializer”,”value.serializer”:”org.apache.org.apache.storm.kafka .ByteBufferSerializer”}}’

1

2

3

CREATE EXTERNAL TABLE ORDERS (ID INT PRIMARY KEY, UNIT_PRICE INT, QUANTITY INT) LOCATION’kafka://localhost:2181/brokers?topic=orders’ TBLPROPERTIES'{“producer”:{“bootstrap.servers”:”localhost:9092″,”acks”:”1″,”key. serializer”:”org.ap ache.org.apache.storm.kafka.IntSerializer”,”value.serializer”:”org.apache.org.apache.storm.kafka.ByteBufferSerializer”}}’

CREATE EXTERNAL TABLE LARGE_ORDERS (ID INT PRIMARY KEY, TOTAL INT) LOCATION’kafka://localhost:2181/brokers?topic=large_orders’ TBLPROPERTIES'{“producer”:{“bootstrap.servers”:”localhost:9092″,”acks”:”1″ ,”key.serializer”:”org.apache.org.apache.storm.kafka.IntSerializer”,”value.serializer”:”org.apache.org.apache.storm.kafka.ByteBufferSerializer”}}’

INSERT INTO LARGE_ORDERS SELECT ID, UNIT_PRICE * QUANTITY AS TOTAL FROM ORDERS WHERE UNIT_PRICE * QUANTITY> 50

1

2

3

4

5

6

$ cp curator-client-2.5.0.jar curator-framework-2.5.0.jar zookeeper- 3.4.6.jar

extlib/

$ cp scala-library-2.10.4.jar kafka-clients-0.8.2.1.jar kafka_2.10-0.8.2.1.jar metrics-core-2.2.0.jar extlib/

$ cp json-simple-1.1.1.jar extlib/

$ cp jackson-annotations-2.6.0.jar extlib /

$ cp storm-kafka-*.jar storm-sql-kafka-*.jar s torm-sql-runtime-*.jar extlib/

1

$ bin/storm sql order_filtering order_filtering.sql

Leave a Comment

Your email address will not be published.