23siddhi初识
siddhi-base
-
事件驱动:消耗、处理、发送的数据都视作一次事件
-
流处理、复杂事件处理平台
-
siddhi application是类似于sql的脚本。.siddhi脚本中分号表示一句结束。
-
包括消费者(sources)、生产者(sinks)、流、查询、表和方法及其他必要的约定
-
可以接受\发送给许多不同类型的事件输入\输出,如tcp,http,kafka,file等。
-
可以接受并转化为不同的数据格式:json,text,xml,key-value
-
处理事件并进行转化,分析
-
过程:接受事件进行消费,将事件传递给相应的查询处理,根据逻辑形成新事件,将新事件发送给流
-
Stream and Query
-
define stream InputTemperatureStream (sensorId string, temperature double);-- 定义一个流,参数是String的传感器ID和double的温度 @info(name = 'Pass-through')-- query的可选注解 from InputTemperatureStream select * insert into TemperatureAndSensorStream; -- 从 InputTemperatureStream 中消费数据,通过select * 获取所有属性生产新事件,输出到 TemperatureAndSensorStream @info(name = 'Simple-selection') from TemperatureAndSensorStream -- 从 TemperatureAndSensorStream中消费事件 select temperature insert into TemperatureOnlyStream;-- 只将 temperature 输出到 TemperatureOnlyStream -- 值为['aq-14', 35.4]的事件被输入到InputTemperatureStream -- InputTemperatureStream : ['aq-14', 35.4] -- TemperatureAndSensorStream : ['aq-14', 35.4] -- TemperatureOnlyStream : [35.4]
-
一共使用了 @info 1个注解 define stream、from、select、insert into这几个关键字
-
-
Source and Sink
-
@source(type='http', receiver.url='http://0.0.0.0:8006/temp',@map(type='json')) -- 消费者是http,接受的地址是 ,接受json数据 define stream TemperatureStream (sensorId string, temperature double); -- 定义流TemperatureStream @sink(type='log') -- 生产者 输出流是log @sink(type='kafka',topic='temperature',bootstrap.servers='localhost:9092',@map(type='json',@payload("""{"temp":"{{temperature}}"}"""))) -- 生产者 输出流是kafka 输出为json topic为temperature bootstrap.servers temperature的值映射到json名为temp define stream TemperatureOnlyStream (temperature double); -- 定义输出流 @info(name = 'Simple-selection') from TemperatureStream select temperature insert into TemperatureOnlyStream; -- query 从TemperatureStream中 查询出 temperature属性 输入到 TemperatureOnlyStream -- input json类型的消息发送到http://0.0.0.0:8006/temp时,一个事件就会产生 { "event":{ "sensorId":"aq-14", "temperature":35.4 } } -- 数据处理后 事件就会到达stream TemperatureOnlyStream 通过log 和 kafka的方式发送到 TemperatureOnlyStream stream -- log 通过passThrough 方式生产输出到控制台 Event{timestamp=1574515771712, data=[35.4], isExpired=false} -- kafka 把stream映射到json 发送到 temperature topic {"temp":"35.4"}
-
一共使用了@source、@sink、@map、@info 【kafka】 [@payload]这几个注解
-
-
Table and Store
-
define stream TemperatureStream (sensorId string, temperature double); -- define stream define table TemperatureLogTable (sensorId string, roomNo string, temperature double); -- define stream @store(type="rdbms",jdbc.url="jdbc:mysql://localhost:3306/sid",username="root", password="root",jdbc.driver.name="com.mysql.jdbc.Driver") -- 存储到 关系型数据库 连接为 用户名 密码 驱动名指定的数据库 define table SensorIdInfoTable (sensorId string, roomNo string); -- 定义表 存储在内存中 @info(name = 'Join-query') from TemperatureStream as t join SensorIdInfoTable as s on t.sensorId == s.sensorId -- 定义 query stream join table on 关键字相等 select t.sensorId as sensorId, s.roomNo as roomNo, t.temperature as temperature insert into TemperatureLogTable; -- 将笛卡尔积查询结果存入表中 -- 当 SensorIdInfoTable包含一条记录['aq-14', '789'],且当一个事件 ['aq-14', 35.4] 到达 TemperatureStream时,一个事件就会被产生,转换为['aq-14', '789', 35.4],并添加到TemperatureLogTable中。 -- 查询表中的数据 -- on-demand query from TemperatureDetailsTable select * -- calling query() method of SiddhiAppRuntime
-
一共使用了@store 1个注解 table 关键字
-
-
Siddhi Application
-
@app:name('Temperature-Processor') -- siddhi app name @app:description('App for processing temperature data.') -- app description @source(type='inMemory', topic='SensorDetail') define stream TemperatureStream (sensorId string, temperature double); -- 消费者 从内存中读取其他app的事件 定义stream @sink(type='inMemory', topic='Temperature') define stream TemperatureOnlyStream (temperature double); -- 生产者 向内存中其他app公开发布事件 定义stream @info(name = 'Simple-selection') from TemperatureStream select temperature insert into TemperatureOnlyStream; -- 定义query 自动从消费者中查询出温度 将事件传递给生产者 由生产者公开事件 -- ['aq-14', 35.4] 从topic SensorDetail中被接收到 就会传递给 TemperatureStream -- [35.4]到达 TemperatureOnlyStream 通过 topic Temperature 传输给内存中其他订阅了的app
-
一共使用了 @app:name @app:description 2个关键字
-
-
Basic Types:int long float double string object
-
convert() instanceof()
-
object又分为 list map 等
-
define stream PatientRegistrationInputStream ( seqNo long, name string, age int, height float, weight double, photo object, isEmployee bool, wardNo object); define stream PatientRegistrationStream ( seqNo long, name string, age int, height double, weight double, photo object, isPhotoString bool, isEmployee bool, wardNo int); -- 定义两个stream 数据类型不同 进行转换 @info(name = 'Type-processor') from PatientRegistrationInputStream select seqNo, name, age, convert(height, 'double') as height, weight, photo, instanceOfString(photo) as isPhotoString, isEmployee, cast(wardNo, 'int') as wardNo -- convert(value,to_type) 返回转换类型后的值 -- instanceofString(value) 返回是否属于string类型 -- cast(value,to_type) 返回转换类型后的值 -- convert是显式转换,cast是强转 insert into PatientRegistrationStream; -- [1200098, 'Peter Johnson', 34, 194.3f, 69.6, #Fjoiu59%3hkjnknk$#nFT, true, 34]是PatientRegistrationInputStream event -- [1200098, 'Peter Johnson', 34, 194.3, 69.6, #Fjoiu59%3hkjnknk$#nFT, false, true, 34] PatientRegistrationStream event
-
一共使用了 convert() instanceofXXX() cast() convert() 4个方法
-
-
Map
-
define stream CoupleDealInfoStream (item1 string, price1 double,item2 string, price2 double); @info(name = 'Create-map') from CoupleDealInfoStream select map:create(item1, price1, item2, price2) as itemPriceMap -- query map:create(key1,value1,...,...) 创建map 键值对为i1:p1 i2:p2 insert into NewMapStream; @info(name = 'Check-map') from NewMapStream select map:isMap(itemPriceMap) as isMap, map:containsKey(itemPriceMap, 'Cookie') as isCookiePresent, map:containsValue(itemPriceMap, 24.0) as isThereItemWithPrice24, map:isEmpty(itemPriceMap) as isEmpty, map:keys(itemPriceMap) as keys, map:size(itemPriceMap) as size insert into MapAnalysisStream; -- map:isMap(map) 判断是否是map -- map:containsKey(map,key) 判断有无key -- map:containsValue(map,value) 判断有没有这个值 -- map:isEmpty(map) 判断map是否为空 -- map:keys(map) 返回keyset -- map:size(map) 返回map的大小 @info(name = 'Clone-and-update') from NewMapStream select map:replace( map:put(map:clone(itemPriceMap), "Gift", 1.0), "Cake", 12.0) as itemPriceMap insert into ItemInsertedMapStream; -- map:replace(map,key,value) map中key的value替换为新的value -- map:put(map,key,value) 在map中添加键值对,返回这个map -- map:clone(map) 返回一个克隆的map -- CoupleDealInfoStream 收到事件 ['Chocolate', 18.0, 'Ice Cream', 24.0] -- NewMapStream [{Ice Cream=24.0, Chocolate =18.0}] -- MapAnalysisStream [true, false, true, false, [Ice Cream, Chocolate], 2] -- ItemInsertedMapStream [{Ice Cream=12.0, Gift=1.0, Chocolate =18.0}]
-
-
List
-
define stream ProductComboStream (product1 string, product2 string, product3 string); @info(name = 'Create-list') from ProductComboStream select list:create(product1, product2, product3) as productList insert into NewListStream; -- list:create(item1,item2,...) @info(name = 'Check-list') from NewListStream select list:isList(productList) as isList, list:contains(productList, 'Cake') as isCakePresent, list:isEmpty(productList) as isEmpty, list:get(productList, 1) as valueAt1, list:size(productList) as size insert into ListAnalysisStream; -- list:isList(list) 判断是否是list -- list:contains(list,item) 判断是否含有元素 -- list:isEmpty(list) 判断是否为空 -- list:get(list,index) 获取下标元素 -- list:size(list) 返回list大小 @info(name = 'Clone-and-update') from NewListStream select list:remove( list:add(list:clone(productList), "Toffee"), "Cake") as productList insert into UpdatedListStream; -- list:remove(list,item) 删除元素 -- list:add(list,item) 添加元素,返回list -- list:clone(list),返回clone的list -- ProductComboStream 添加事件 ['Ice Cream', 'Chocolate', 'Cake'] -- NewListStream [[Ice Cream, Chocolate, Cake]] -- ListAnalysisStream [true, true, false, Chocolate, 3] -- UpdatedListStream [[Ice Cream, Chocolate, Toffee]]
-
-
Null
-
define stream ProductInputStream (item string, price double); define Table ProductInfoTable (item string, discount double); @info(name = 'Check-for-null') from ProductInputStream [not(item is null)] -- not(test) 返回表达式的结果true/false -- [] 相当于if 判断条件 -- object is null 返回是否为null select item, price is null as isPriceNull insert into ProductValidationStream; @info(name = 'Outer-join-with-table') from ProductInputStream as s left outer join ProductInfoTable as t on s.item == t.item select s.item, s.price, t.discount, math:power(t.discount, 2) is null as isFunctionReturnsNull, t is null as isTNull, s is null as isSNull, t.discount is null as isTDiscountNull, s.item is null as isSItemNull insert into DiscountValidationStream; -- math:power(number,exp) 幂运算 -- ProductInputStream ['Cake', 12.0] -- ProductValidationStream [Cake, false] -- DiscountValidationStream [Cake, 12.0, null, true, true, false, true, false]
-
siddhi-event&data
event cleansing
-
value based filtering
-
define stream TemperatureStream ( sensorId string, temperature double); @info(name = 'EqualsFilter') from TemperatureStream[ sensorId == 'A1234'] select * insert into SenorA1234TemperatureStream; -- [] 判断if @info(name = 'RangeFilter') from TemperatureStream[ temperature > -2 and temperature < 40] select * insert into NormalTemperatureStream; -- > < and @info(name = 'NullFilter') from TemperatureStream[ sensorId is null ] select * insert into InValidTemperatureStream; -- is null
-
[] 判断if > < and is null
-
-
if-then-else
-
define stream TemperatureStream (sensorId string, temperature double); @info(name = 'SimpleIfElseQuery') from TemperatureStream select sensorId, ifThenElse(temperature > -2, 'Valid', 'InValid') as isValid -- ifThenElse(test,trueValue,falseValue) 三目表达式 insert into ValidTemperatureStream; @info(name = 'ComplexIfElseQuery') from TemperatureStream select sensorId, ifThenElse(temperature > -2, ifThenElse(temperature > 40, 'High', 'Normal'), 'InValid') as tempStatus -- 嵌套三目表达式 insert into ProcessedTemperatureStream;
-
ifThenElse(test,trueValue,falseValue) 三目表达式
-
-
regex matching
-
define stream SweetProductionStream (name string, amount int); @info(name='ProcessSweetProductionStream') from SweetProductionStream select name, regex:matches('chocolate(.*)', name) as isAChocolateProduct, regex:group('.*\s(.*)', name, 1) as sweetType -- regex:matches(regex,string) 判断string是否匹配模式,返回true/false -- regex:group(regex,string,index) string匹配regex后,划分为group,返回下标为index的元素 -- '.*\s(.*)'任意字符 中间是空格 任意字符 insert into ChocolateProductStream; -- SweetProductionStream ['chocolate cake', 34] -- ChocolateProductStream ['chocolate cake', true, 'cake']
-
-
default
-
define stream PatientRegistrationInputStream ( seqNo long, name string, age int, height float, weight double, photo object, isEmployee bool, wardNo object); @info(name = 'SimpleIfElseQuery') from PatientRegistrationInputStream select default(name, 'invalid') as name, default(seqNo, 0l) as seqNo, default(weight, 0d) as weight, default(age, 0) as age, default(height, 0f) as height -- default(param,defaultValue) 如果该值不存在,就为默认值 insert into PreprocessedPatientRegistrationInputStream; -- 各类型的默认值 ['invalid', 0 0.0, 0, 0.0]
-
-
type based filtering
-
define stream SweetProductionStream (name string, amount int); @info(name='ProcessSweetProductionStream') from SweetProductionStream select instanceOfInteger(amount) as isAIntInstance, name, amount insert into ProcessedSweetProductionStream; -- 将判断类型的结果放入event作为过滤
-
-
remove duplicate event
-
define stream TemperatureStream (sensorId string, seqNo string, temperature double); @info(name = 'Deduplicate-sensorId') from TemperatureStream#unique:deduplicate(sensorId, 1 min) -- stream_name#unique:deduplicate(id,time_gap) 根据id去重time gap之间到达的event,超出gap的不算重复 select * insert into UniqueSensorStream; @info(name = 'Deduplicate-sensorId-and-seqNo') from TemperatureStream#unique:deduplicate( str:concat(sensorId,'-',seqNo), 1 min) -- str:concat(str1,concat_symbol,str2) select * insert into UniqueSensorSeqNoStream;
-
data transformation
-
math & logical operation
-
define stream TemperatureStream (sensorId string, temperature double); @info(name = 'celciusTemperature') from TemperatureStream select sensorId, (temperature * 9 / 5) + 32 as temperature insert into FahrenheitTemperatureStream; @info(name = 'Overall-analysis') from FahrenheitTemperatureStream select sensorId, math:floor(temperature) as approximateTemp -- math:floor(number) 向下取整 insert all events into OverallTemperatureStream; @info(name = 'RangeFilter') from OverallTemperatureStream [ approximateTemp > -2 and approximateTemp < 40] select * insert into NormalTemperatureStream;
-
-
transform json
-
define stream InputStream(jsonString string); from InputStream select json:toObject(jsonString) as jsonObj insert into PersonalDetails; -- json:toObject(string) json->object from PersonalDetails select jsonObj, json:getString(jsonObj,'$.name') as name, json:isExists(jsonObj, '$.salary') as isSalaryAvailable, json:toString(jsonObj) as jsonString -- json:getString(object,"$.item") 返回item名对应的值 -- json:isExists(object,"$.item") 判断item名对应的值是否存在 -- json:toString(object) 返回object对应的字符串 insert into OutputStream; from OutputStream[isSalaryAvailable == false] select json:setElement(jsonObj, '$', 0f, 'salary') as jsonObj -- json:setElement(object,'$',value,'item') 将item的值赋为value,添加到object insert into PreprocessedStream; { "name" : "siddhi.user", "address" : { "country": "Sri Lanka", }, "contact": "+9xxxxxxxx" } OutputStream [ {"address":{"country":"Sri Lanka"},"contact":"+9xxxxxxxx","name":"siddhi.user"}, siddhi.user, false, "{\"name\" : \"siddhi.user\", \"address\" : { \"country\": \"Sri Lanka\", }, \"contact\": \"+9xxxxxxxx\"}"] PreprocessedStream { "name" : "siddhi.user", "salary": 0.0 "address" : { "country": "Sri Lanka", }, "contact": "+9xxxxxxxx" }
-
data summarization
-
sliding time
-
define stream TemperatureStream (sensorId string, temperature double); @info(name = 'Overall-analysis') from TemperatureStream#window.time(1 min) # window.time(time_gap) 滑动窗口的时间间隔是1分钟以内 select avg(temperature) as avgTemperature, max(temperature) as maxTemperature, count() as numberOfEvents insert all events into OverallTemperatureStream; -- avg(number) -- max(number) -- count() -- insert all events into stream 在时间间隔内添加所有事件到stream中,并且过期后删除 @info(name = 'SensorId-analysis') from TemperatureStream#window.time(30 sec) select sensorId, avg(temperature) as avgTemperature, min(temperature) as maxTemperature group by sensorId having avgTemperature > 20.0 -- group by id having 条件 insert into SensorIdTemperatureStream; -- 筛选后加入到滑动窗口中
-
stream#method 可以看做是这个method限制了stream的行为,比如去重、滑动窗口聚合等。
-
-
batch (tumbling) time
-
define stream TemperatureStream (sensorId string, temperature double); @info(name = 'Overall-analysis') from TemperatureStream#window.timeBatch(1 min) select avg(temperature) as avgTemperature, max(temperature) as maxTemperature, count() as numberOfEvents -- stream#window.timeBatch(time_gap) 每time_gap时间视作一个batch,从第一个到达的event开始 insert into OverallTemperatureStream; @info(name = 'SensorId-analysis') from TemperatureStream#window.timeBatch(30 sec, 0) -- stream#window.timeBatch(time_gap,start_timepoint) 每time_gap时间视作一个batch,从开始时刻开始 select sensorId, avg(temperature) as avgTemperature, min(temperature) as maxTemperature group by sensorId having avgTemperature > 20.0 insert into SensorIdTemperatureStream;
-
window:time()和window:timeBatch()的区别在于,前者是从时间间隔开始就开始写入stream,后者是时间间隔结束后写入stream。
-
-
sliding event count
-
define stream TemperatureStream (sensorId string, temperature double); @info(name = 'Overall-analysis') from TemperatureStream#window.length(4) -- stream#window.length(len) 滑动窗口受长度限制,聚合最后len个事件 select avg(temperature) as avgTemperature, max(temperature) as maxTemperature, count() as numberOfEvents insert into OverallTemperatureStream; @info(name = 'SensorId-analysis') from TemperatureStream#window.length(5) select sensorId, avg(temperature) as avgTemperature, min(temperature) as maxTemperature group by sensorId having avgTemperature >= 20.0 insert into SensorIdTemperatureStream;
-
-
batch(tumbling) event count
-
define stream TemperatureStream (sensorId string, temperature double); @info(name = 'Overall-analysis') from TemperatureStream#window.lengthBatch(4) -- stream#window.lengthBatch(len) 滑动窗口受长度限制,聚合最后len个事件为batch select avg(temperature) as avgTemperature, max(temperature) as maxTemperature, count() as numberOfEvents insert into OverallTemperatureStream; @info(name = 'SensorId-analysis') from TemperatureStream#window.lengthBatch(5) select sensorId, avg(temperature) as avgTemperature, min(temperature) as maxTemperature group by sensorId having avgTemperature >= 20.0 insert into SensorIdTemperatureStream;
-
-
session
-
define stream PurchaseStream (userId string, item string, price double); @info(name = 'Session-analysis') from PurchaseStream#window.session(1 min, userId) # stream#window.session(time_gap,session_id) 通过id聚合事件,在time_gap时间内 select userId, count() as totalItems, sum(price) as totalPrice group by userId insert into UserIdPurchaseStream; @info(name = 'Session-analysis-with-late-event-arrivals') from PurchaseStream#window.session(1 min, userId, 20 sec) -- 在基于 userId 的 sessionwindow 上聚合事件,会话间隔为 1 分钟,允许延迟 20 秒以捕获延迟到达。 select userId, count() as totalItems, sum(price) as totalPrice group by userId insert into OutOfOrderUserIdPurchaseStream;
-
-
named window
-
define stream TemperatureStream (sensorId string, temperature double); define window OneMinTimeWindow (sensorId string, temperature double) time(1 min) ; -- define window name() time_type(time_gap) @info(name = 'Insert-to-window') from TemperatureStream insert into OneMinTimeWindow; @info(name = 'Min-max-analysis') from OneMinTimeWindow select min(temperature) as minTemperature, max(temperature) as maxTemperature insert into MinMaxTemperatureOver1MinStream; @info(name = 'Per-sensor-analysis') from OneMinTimeWindow select sensorId, avg(temperature) as avgTemperature group by sensorId insert into AvgTemperaturePerSensorStream;
-
data pipelining
-
stream join
-
define stream TemperatureStream (roomNo string, temperature double); define stream HumidityStream (roomNo string, humidity double); @info(name = 'Equi-join') from TemperatureStream#window.unique:time(roomNo, 1 min) as t join HumidityStream#window.unique:time(roomNo, 1 min) as h on t.roomNo == h.roomNo -- stream#window.unique:time(id,time_gap) 只留下时间间隔内唯一的事件 -- stream join select t.roomNo, t.temperature, h.humidity insert into TemperatureHumidityStream; @info(name = 'Join-on-temperature') from TemperatureStream as t left outer join HumidityStream#window.time(1 min) as h on t.roomNo == h.roomNo -- stream t 左外连接 stream h 1分钟以内的事件 select t.roomNo, t.temperature, h.humidity insert into EnrichedTemperatureStream;
-
-
partition events by value
-
define stream LoginStream ( userID string, loginSuccessful bool); @purge(enable='true', interval='10 sec', idle.period='1 hour') partition with ( userID of LoginStream ) -- @purge 每10秒检查一下 1小时之内没有收到过事件的分区实例 并清除 -- partition 按照userID分区 begin @info(name='Aggregation-query') from LoginStream#window.length(3) select userID, loginSuccessful, count() as attempts group by loginSuccessful insert into #LoginAttempts; -- #LoginAttempts 是partition的内部stream,只能在内部访问 @info(name='Alert-query') from #LoginAttempts[loginSuccessful==false and attempts==3] select userID, "3 consecutive login failures!" as message insert into UserSuspensionStream; end;
-
-
scatter and gather(String)
-
define stream PurchaseStream (userId string, items string, store string); @info(name = 'Scatter-query') from PurchaseStream#str:tokenize(items, ',', true) select userId, token as item, store insert into TokenizedItemStream; -- str:tokenize(items,seperator,boolean) items用分隔符连接 @info(name = 'Transform-query') from TokenizedItemStream select userId, str:concat(store, "-", item) as itemKey insert into TransformedItemStream; -- str:concat() @info(name = 'Gather-query') from TransformedItemStream#window.batch() select userId, str:groupConcat(itemKey, ",") as itemKeys insert into GroupedPurchaseItemStream; -- str:groupConcat(items,seperator) items用分隔符连接
-
-
scatter and gather(json)
-
define stream PurchaseStream (order string, store string); @info(name = 'Scatter-query') from PurchaseStream#json:tokenize(order, '$.order.items') -- json:tokenize(json,'$.name') 访问name下的值,分隔开 select json:getString(order, '$.order.id') as orderId, -- 拿出order.id jsonElement as item, store insert into TokenizedItemStream; @info(name = 'Transform-query') from TokenizedItemStream select orderId, ifThenElse(json:getString(item, 'name') == "cake", json:toString( json:setElement(item, 'price', json:getDouble(item, 'price') - 5 ) ), item) as item, store insert into DiscountedItemStream; @info(name = 'Gather-query') from DiscountedItemStream#window.batch() select orderId, json:group(item) as items, store insert into GroupedItemStream; -- json:group(item) 返回一个item json数组 @info(name = 'Format-query') from GroupedItemStream select str:fillTemplate(""" {"discountedOrder": {"id":"{{1}}", "store":"{{3}}", "items":{{2}} } }""", orderId, items, store) as discountedOrder insert into DiscountedOrderStream; -- str:fillTemplate({{n}}) 类似于format()的占位符 返回一个拼好的string
-
siddhi-others
patterns & trends
-
simple pattern 简单模式 一个或多个事件随时间到达
-
define stream TemperatureStream(roomNo int, temp double); @sink(type = 'log') define Stream HighTempAlertStream(roomNo int, initialTemp double, finalTemp double); @info(name='temperature-increase-identifier') from every( e1 = TemperatureStream ) -> e2 = TemperatureStream[ e1.roomNo == roomNo and (e1.temp + 5) <= temp ] within 10 min select e1.roomNo, e1.temp as initialTemp, e2.temp as finalTemp insert into HighTempAlertStream;
-
-
count pattern 计数模式 匹配相同条件收到的多个事件
-
define stream TemperatureStream (sensorID long, roomNo int, temp double); define stream RegulatorStream (deviceID long, roomNo int, tempSet double, isOn bool); @sink(type = 'log') define stream TemperatureDiffStream(roomNo int, tempDiff double); from every( e1 = RegulatorStream) -> e2 = TemperatureStream[e1.roomNo == roomNo] < 1: > -> e3 = RegulatorStream[e1.roomNo == roomNo] select e1.roomNo, e2[0].temp - e2[last].temp as tempDiff insert into TemperatureDiffStream;
-
-
logical pattern 逻辑模式 匹配时间到达的顺序和逻辑关系
-
define stream RegulatorStateChangeStream(deviceID long, roomNo int, tempSet double, action string); define stream RoomKeyStream(deviceID long, roomNo int, action string); @sink(type='log') define stream RegulatorActionStream(roomNo int, action string); from every e1=RegulatorStateChangeStream[ action == 'on' ] -> e2=RoomKeyStream [ e1.roomNo == roomNo and action == 'removed' ] or e3=RegulatorStateChangeStream [ e1.roomNo == roomNo and action == 'off'] select e1.roomNo, ifThenElse( e2 is null, 'none', 'stop' ) as action having action != 'none' insert into RegulatorActionStream;
-
-
non-concurrence pattern 非发生模式 检测事件的发生和过期
-
define stream RegulatorStateChangeStream(deviceID long, roomNo int, tempSet double, action string); define stream TemperatureStream (roomNo int, temp double); @sink(type='log') define stream RoomTemperatureAlertStream(roomNo int); from e1=RegulatorStateChangeStream[action == 'on'] -> not TemperatureStream[e1.roomNo == roomNo and temp <= e1.tempSet] for 30 sec select e1.roomNo as roomNo insert into RoomTemperatureAlertStream;
-
-
simple sequence 简单序列 匹配按时间到达的连续事件
-
define stream StockRateStream (symbol string, price float, volume int); @sink(type='log') define stream PeakStockRateStream (symbol string, rateAtPeak float); partition with (symbol of StockRateStream) begin from every e1=StockRateStream, e2=StockRateStream[e1.price < price], e3=StockRateStream[e2.price > price] within 10 min select e1.symbol, e2.price as rateAtPeak insert into PeakStockRateStream ; end;
-
-
sequence with count 按计数序列排序 可以计数的序列
-
define stream TemperatureStream(roomNo int, temp double); @sink(type='log') define stream PeekTemperatureStream(roomNo int, initialTemp double, peekTemp double, firstDropTemp double); partition with (roomNo of TemperatureStream) begin @info(name = 'temperature-trend-analyzer') from every e1=TemperatureStream, e2=TemperatureStream[ifThenElse(e2[last].temp is null, e1.temp <= temp, e2[last].temp <= temp)]+, e3=TemperatureStream[e2[last].temp > temp] select e1.roomNo, e1.temp as initialTemp, e2[last].temp as peekTemp, e3.temp as firstDropTemp insert into PeekTemperatureStream ; end;
-
-
logical sequence 逻辑序列 匹配重复事件序列 并用逻辑排序
-
define stream TempSensorStream(deviceID long, isActive bool); define stream HumidSensorStream(deviceID long, isActive bool); define stream RegulatorStream(deviceID long, isOn bool); @sink(type='log') define stream StateNotificationStream (deviceID long, tempSensorActive bool, humidSensorActive bool); from every e1=RegulatorStream[isOn == true], e2=TempSensorStream and e3=HumidSensorStream select e1.deviceID, e2.isActive as tempSensorActive, e3.isActive as humidSensorActive insert into StateNotificationStream;
-
service integration
-
HTTP Service Integration
-
@sink(type='http-call', publisher.url='http://localhost:8005/validate-loan', method='POST', sink.id='loan-validation', @map(type='json')) define stream LoanValidationStream (clientId long, name string, amount double); -- 生产者 http-call post方式在这个URL上广播json数据 @source(type='http-call-response', sink.id='loan-validation', http.status.code='2\d+', @map(type='json', @attributes(customerName='trp:name', clientId='trp:clientId', loanAmount='trp:amount', interestRate='validation-response.rate', totalYears='validation-response.years-approved'))) define stream SuccessLoanRequestStream(clientId long, customerName string, loanAmount double, interestRate double, totalYears int); -- 消费者 http-call-response 只满足状态码是2开头的情况 @source(type='http-call-response', sink.id='loan-validation', http.status.code='400', @map(type='json', @attributes(customerName='trp:name', clientId='trp:clientId', failureReason='validation-response.reason'))) define stream FailureLoanRequestStream(clientId long, customerName string, failureReason string); -- 消费者 http-call-response 只满足状态码是400的情况 define stream LoanRequestStream (clientId long, name string, amount double); @sink(type='log') define stream LoanResponseStream(clientId long, customerName string, message string); -- 生产者 log @info(name = 'attribute-projection') from LoanRequestStream select clientId, name, amount insert into LoanValidationStream; @info(name = 'successful-message-generator') from SuccessLoanRequestStream select clientId, customerName, "Loan Request is accepted for processing" as message insert into LoanResponseStream; @info(name = 'failure-message-generator') from FailureLoanRequestStream select clientId, customerName, str:concat("Loan Request is rejected due to ", failureReason) as message insert into LoanResponseStream;
-
分别定义了http响应情况是正常和400的情况的消费者,通过json验证,结果以log的形式广播
-
-
gRPC service integration
-
define stream TicketBookingStream (name string, phoneNo string, movie string, ticketClass string, qty int, bookingTime long); @sink(type='grpc-call', publisher.url = 'grpc://localhost:5003/org.wso2.grpc.EventService/process', sink.id= 'ticket-price', @map(type='json')) define stream TicketPriceFinderStream (name string, phoneNo string, movie string, ticketClass string, qty int, bookingTime long); -- 生产者 @source(type='grpc-call-response', receiver.url = 'grpc://localhost:9763/org.wso2.grpc.EventService/process', sink.id= 'ticket-price', @map(type='json', @attributes(customerName='trp:name', phoneNo='trp:phoneNo', movie='trp:movie', qty='trp:qty', bookingTime='trp:bookingTime', ticketPrice='price'))) -- 消费者 define stream TicketPriceResponseStream (customerName string, phoneNo string, movie string, qty int, ticketPrice double, bookingTime long); @sink(type='log') define stream TotalTicketPaymentStream (customerName string, phoneNo string, movie string, totalAmount double, bookingTime long); -- log 生产者 @info(name = 'filter-basic-ticket-bookings') from TicketBookingStream[ticketClass == "BASIC"] select name as customerName, phoneNo, movie, qty * 20.0 as totalAmount, bookingTime insert into TotalTicketPaymentStream; @info(name = 'filter-non-basic-tickets') from TicketBookingStream[ticketClass != "BASIC"] select * insert into TicketPriceFinderStream; @info(name = 'total-price-calculator') from TicketPriceResponseStream select customerName, phoneNo, movie, (qty * ticketPrice) as totalAmount, bookingTime insert into TotalTicketPaymentStream;
-
rate limiting
-
rate limit based on time
-
define stream APIRequestStream (apiName string, version string, tier string, user string, userEmail string); define stream UserNotificationStream (user string, apiName string, version string, tier string, userEmail string, throttledCount long); @info(name='api-throttler') from APIRequestStream#window.timeBatch(1 min, 0, true) select apiName, version, user, tier, userEmail, count() as totalRequestCount group by apiName, version, user having totalRequestCount == 3 or totalRequestCount == 0 insert all events into ThrottledStream; @info(name='throttle-flag-generator') from ThrottledStream select apiName, version, user, tier, userEmail, ifThenElse(totalRequestCount == 0, false, true) as isThrottled insert into ThrottleOutputStream; @info(name='notification-generator') from ThrottleOutputStream[isThrottled]#window.time(1 hour) select user, apiName, version, tier, userEmail, count() as throttledCount group by user, apiName, version, tier having throttledCount > 2 output first every 15 min insert into UserNotificationStream;
-
error handling
-
logging
-
define stream GlucoseReadingStream (locationRoom string, locationBed string, timeStamp string, sensorID long, patientFirstName string, patientLastName string, sensorValue double); @sink(type = 'http', on.error='log', publisher.url = "http://localhost:8080/logger", method = "POST", @map(type = 'json')) -- log 报错 define stream AbnormalGlucoseReadingStream (timeStampInLong long, locationRoom string, locationBed string, sensorID long, patientFullName string, sensorReadingValue double); @info(name='abnormal-reading-identifier') from GlucoseReadingStream[sensorValue > 220] select math:parseLong(timeStamp) as timeStampInLong, locationRoom, locationBed, sensorID, str:concat(patientFirstName, " ", patientLastName) as patientFullName, sensorValue as sensorReadingValue insert into AbnormalGlucoseReadingStream;
-
-
wait&retry
-
define stream GlucoseReadingStream (locationRoom string, locationBed string, timeStamp string, sensorID long, patientFirstName string, patientLastName string, sensorValue double); @sink(type = 'http', on.error='wait', publisher.url = "http://localhost:8080/logger", method = "POST", @map(type = 'json')) -- 遇到错误等待 退出并重试 -- on.error 配置处理错误的解决方法 define stream AbnormalGlucoseReadingStream (timeStampInLong long, locationRoom string, locationBed string, sensorID long, patientFullName string, sensorReadingValue double); @info(name='abnormal-reading-identifier') from GlucoseReadingStream[sensorValue > 220] select math:parseLong(timeStamp) as timeStampInLong, locationRoom, locationBed, sensorID, str:concat(patientFirstName, " ", patientLastName) as patientFullName, sensorValue as sensorReadingValue insert into AbnormalGlucoseReadingStream;
-
热门相关:最强狂兵 学霸女神超给力 最强装逼打脸系统 横行霸道 名门天后:重生国民千金