Storm SQL integration allows users to run SQL queries on Storm streaming data. In streaming analysis, the SQL interface will not only speed up the development cycle, but also open up opportunities for unified batch processing Apache Hive and real-time streaming data processing.
StormSQL will compile high-level SQL queries into Trident topologies and allow them on the Storm cluster. This article will show users how to use StormSQL. If anyone is interested in the details of StormSQL design and implementation, please refer to here
use
allow storm sql
command to compile SQL statements It is the Trident topology and submitted to the Storm cluster.
|
|
Here sql-file
contains the SQL statement that needs to be executed , topo-name
is the name of the submitted topology.
Supported functions
In the current version library (1.0.1), the following functions are supported:
- Stream processing to read and write external data sources
- Filter tuples
- Projections
specify External data source
StormSQL data is expressed in the form of an external table. Users can use the CREATE EXTERNAL TABLE
statement to specify the data source. The syntax of CREATE EXTERNAL TABLE
strictly follows the definition in Hive Data Definition Language.
|
td> |
You can find a detailed explanation of each attribute in Hive Data Definition Language. For example: the following statement specifies a Kafka spout and sink:
|
|
Docking with external data sources
User Docking with external data sources needs to implement the ISqlTridentDataSource
interface and register them with Java’s service loading mechanism. The external data source will be selected based on the Scheme of the URI in the table. Please refer to storm-sql-kafka
for more implementation details.
Example: filter Kafka data stream
Suppose there is a Kafka data stream storing transaction order data. Each message in the stream contains the id of the order, the unit price of the product, and the quantity of the order. Our purpose is to filter out orders with a large transaction volume and insert these orders into another Kafka data stream for further analysis.
Users can specify the following SQL statements in the SQL file:
< pre>
1
2
3
|
|
The ORDER
table defined in the first statement represents the input stream. The LOCATION
field specifies the ZK address (localhost:2181
), the path of brokers in Zookeeper (/brokers
), and topic (orders
). The TBLPROPERTIES
field specifies the configuration items of KafkaProducer.
Current implementation of storm-sql-kafka
even if the table is read-only or write-only, you need to specify the LOCATION
and TBLPROPERTIES
items .
The LARGE_ORDERS
table defined by the similar second statement represents the output stream. The third SELECT
statement defines the topology: it makes StormSQL filter all orders in the external table ORDERS
Price and insert the satisfied record into the specified LARGE_ORDER
Kafka stream.
To run this example, the user needs to include the data source (storm-sql-kafka
in this example) and its dependencies in the classpath. One way is to put the required jars in the extlib
directory:
|
|
Next, submit the SQL statement to StormSQL:
|
|
Now you should be able to see the order_filtering
topology in the Storm UI.
Current Defects
Aggregation, windowing and joining have not yet been implemented; the parallelism of the specified topology is not currently supported; all The parallelism of processing tasks is 1.
Users also need to provide external data source dependencies in the extlib
directory, otherwise the topology will fail to run because of ClassNotFoundException
.
The current Kafka implementation connector in StormSQL assumes that the input and output data are both in JSON format. The connector does not yet support INPUTFORMAT
and OUTPUTFORMAT
.
Original text: Big column Storm SQL integration
1
$ bin/storm sql
p>
1
2
3
4
5
6
7
8
CREATE EXTERNAL TABLE table_name field_list
[ STORED AS
INPUTFORMAT input_format_classname
OUTPUTFORMAT output_format_classname
]
LOCATION location
[ TBLPROPERTIES tbl_properties ]
[ AS select_stmt ]
1< /p>
CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY) LOCATION’kafka://localhost:2181/brokers?topic=test’ TBLPROPERTIES'{“producer”:{“bootstrap.servers”:”localhost:9092 “,”acks”:”1″,”key.serializer”:”org.apache.org.apache.storm.kafka.IntSerializer”,”value.serializer”:”org.apache.org.apache.storm.kafka .ByteBufferSerializer”}}’
1
2
3
CREATE EXTERNAL TABLE ORDERS (ID INT PRIMARY KEY, UNIT_PRICE INT, QUANTITY INT) LOCATION’kafka://localhost:2181/brokers?topic=orders’ TBLPROPERTIES'{“producer”:{“bootstrap.servers”:”localhost:9092″,”acks”:”1″,”key. serializer”:”org.ap ache.org.apache.storm.kafka.IntSerializer”,”value.serializer”:”org.apache.org.apache.storm.kafka.ByteBufferSerializer”}}’
CREATE EXTERNAL TABLE LARGE_ORDERS (ID INT PRIMARY KEY, TOTAL INT) LOCATION’kafka://localhost:2181/brokers?topic=large_orders’ TBLPROPERTIES'{“producer”:{“bootstrap.servers”:”localhost:9092″,”acks”:”1″ ,”key.serializer”:”org.apache.org.apache.storm.kafka.IntSerializer”,”value.serializer”:”org.apache.org.apache.storm.kafka.ByteBufferSerializer”}}’
INSERT INTO LARGE_ORDERS SELECT ID, UNIT_PRICE * QUANTITY AS TOTAL FROM ORDERS WHERE UNIT_PRICE * QUANTITY> 50
1
2
3
4
5
6
$ cp curator-client-2.5.0.jar curator-framework-2.5.0.jar zookeeper- 3.4.6.jar
extlib/
$ cp scala-library-2.10.4.jar kafka-clients-0.8.2.1.jar kafka_2.10-0.8.2.1.jar metrics-core-2.2.0.jar extlib/
$ cp json-simple-1.1.1.jar extlib/
$ cp jackson-annotations-2.6.0.jar extlib /
$ cp storm-kafka-*.jar storm-sql-kafka-*.jar s torm-sql-runtime-*.jar extlib/
1
$ bin/storm sql order_filtering order_filtering.sql