23siddhi初识

siddhi-base

  • 事件驱动:消耗、处理、发送的数据都视作一次事件

  • 流处理、复杂事件处理平台

  • siddhi application是类似于sql的脚本。.siddhi脚本中分号表示一句结束。

  • 包括消费者(sources)、生产者(sinks)、流、查询、表和方法及其他必要的约定

  • 可以接受\发送给许多不同类型的事件输入\输出,如tcp,http,kafka,file等。

  • 可以接受并转化为不同的数据格式:json,text,xml,key-value

  • 处理事件并进行转化,分析

  • 过程:接受事件进行消费,将事件传递给相应的查询处理,根据逻辑形成新事件,将新事件发送给流

  • Stream and Query

    • define stream InputTemperatureStream (sensorId string, temperature double);-- 定义一个流,参数是String的传感器ID和double的温度
      @info(name = 'Pass-through')-- query的可选注解
      from InputTemperatureStream
      select *
      insert into TemperatureAndSensorStream; -- 从 InputTemperatureStream 中消费数据,通过select * 获取所有属性生产新事件,输出到 TemperatureAndSensorStream
      @info(name = 'Simple-selection')
      from TemperatureAndSensorStream -- 从 TemperatureAndSensorStream中消费事件
      select temperature
      insert into TemperatureOnlyStream;-- 只将 temperature 输出到 TemperatureOnlyStream
      -- 值为['aq-14', 35.4]的事件被输入到InputTemperatureStream
      -- InputTemperatureStream : ['aq-14', 35.4]
      -- TemperatureAndSensorStream : ['aq-14', 35.4]
      -- TemperatureOnlyStream : [35.4]
      
    • 一共使用了 @info 1个注解 define stream、from、select、insert into这几个关键字

  • Source and Sink

    • @source(type='http', receiver.url='http://0.0.0.0:8006/temp',@map(type='json'))
      -- 消费者是http,接受的地址是 ,接受json数据
      define stream TemperatureStream (sensorId string, temperature double);
      -- 定义流TemperatureStream
      @sink(type='log') -- 生产者 输出流是log
      @sink(type='kafka',topic='temperature',bootstrap.servers='localhost:9092',@map(type='json',@payload("""{"temp":"{{temperature}}"}""")))
      -- 生产者 输出流是kafka 输出为json topic为temperature bootstrap.servers  temperature的值映射到json名为temp
      define stream TemperatureOnlyStream (temperature double); -- 定义输出流
      @info(name = 'Simple-selection') 
      from TemperatureStream 
      select temperature 
      insert into TemperatureOnlyStream;
      -- query 从TemperatureStream中 查询出 temperature属性 输入到 TemperatureOnlyStream
      
      
      -- input json类型的消息发送到http://0.0.0.0:8006/temp时,一个事件就会产生
      {
         "event":{
            "sensorId":"aq-14",
            "temperature":35.4
         }
      }
      -- 数据处理后 事件就会到达stream TemperatureOnlyStream 通过log 和 kafka的方式发送到 TemperatureOnlyStream  stream
      -- log 通过passThrough 方式生产输出到控制台
      Event{timestamp=1574515771712, data=[35.4], isExpired=false}
      -- kafka 把stream映射到json 发送到 temperature topic
      {"temp":"35.4"}
      
    • 一共使用了@source、@sink、@map、@info 【kafka】 [@payload]这几个注解

  • Table and Store

    • define stream TemperatureStream (sensorId string, temperature double); -- define stream
      define table TemperatureLogTable (sensorId string, roomNo string, temperature double); -- define stream
      @store(type="rdbms",jdbc.url="jdbc:mysql://localhost:3306/sid",username="root", password="root",jdbc.driver.name="com.mysql.jdbc.Driver")
      -- 存储到 关系型数据库 连接为 用户名 密码 驱动名指定的数据库
      define table SensorIdInfoTable (sensorId string, roomNo string); -- 定义表 存储在内存中
      @info(name = 'Join-query')
      from TemperatureStream as t join SensorIdInfoTable as s
           on t.sensorId == s.sensorId
      -- 定义 query  stream join table on 关键字相等
      select t.sensorId as sensorId, s.roomNo as roomNo, t.temperature as temperature
      insert into TemperatureLogTable;
      -- 将笛卡尔积查询结果存入表中
      
      -- 当 SensorIdInfoTable包含一条记录['aq-14', '789'],且当一个事件 ['aq-14', 35.4] 到达 TemperatureStream时,一个事件就会被产生,转换为['aq-14', '789', 35.4],并添加到TemperatureLogTable中。
      
      -- 查询表中的数据
      -- on-demand query 
      from TemperatureDetailsTable
      select *
      -- calling query() method of SiddhiAppRuntime
      
    • 一共使用了@store 1个注解 table 关键字

  • Siddhi Application

    • @app:name('Temperature-Processor') -- siddhi app name
      @app:description('App for processing temperature data.') -- app description 
      @source(type='inMemory', topic='SensorDetail') 
      define stream TemperatureStream (sensorId string, temperature double);
      -- 消费者 从内存中读取其他app的事件 定义stream
      @sink(type='inMemory', topic='Temperature')
      define stream TemperatureOnlyStream (temperature double);
      -- 生产者 向内存中其他app公开发布事件 定义stream
      @info(name = 'Simple-selection')
      from TemperatureStream
      select temperature
      insert into TemperatureOnlyStream;
      -- 定义query 自动从消费者中查询出温度 将事件传递给生产者 由生产者公开事件
      --  ['aq-14', 35.4] 从topic SensorDetail中被接收到 就会传递给 TemperatureStream
      --  [35.4]到达 TemperatureOnlyStream  通过 topic Temperature 传输给内存中其他订阅了的app
      
    • 一共使用了 @app:name @app:description 2个关键字

  • Basic Types:int long float double string object

    • convert() instanceof()

    • object又分为 list map 等

    • define stream PatientRegistrationInputStream (
                       seqNo long, name string, age int,
                       height float, weight double, photo object,
                       isEmployee bool, wardNo object);
      
      define stream PatientRegistrationStream (
                       seqNo long, name string, age int,
                       height double, weight double, photo object,
                       isPhotoString bool, isEmployee bool,
                       wardNo int);
      -- 定义两个stream 数据类型不同 进行转换
      @info(name = 'Type-processor')
      from PatientRegistrationInputStream
      select seqNo, name, age,
             convert(height, 'double') as height,
      
             weight, photo,
             instanceOfString(photo) as isPhotoString,
      
             isEmployee,
             cast(wardNo, 'int') as wardNo
             
      -- convert(value,to_type) 返回转换类型后的值 
      -- instanceofString(value) 返回是否属于string类型
      -- cast(value,to_type) 返回转换类型后的值 
      -- convert是显式转换,cast是强转
      
      insert into PatientRegistrationStream;
      
      -- [1200098, 'Peter Johnson', 34, 194.3f, 69.6, #Fjoiu59%3hkjnknk$#nFT, true, 34]是PatientRegistrationInputStream event
      -- [1200098, 'Peter Johnson', 34, 194.3, 69.6, #Fjoiu59%3hkjnknk$#nFT, false, true, 34] PatientRegistrationStream  event
      
    • 一共使用了 convert() instanceofXXX() cast() convert() 4个方法

  • Map

    • define stream CoupleDealInfoStream (item1 string, price1 double,item2 string, price2 double);
      @info(name = 'Create-map')
      from CoupleDealInfoStream
      select map:create(item1, price1, item2, price2)
      as itemPriceMap
      -- query map:create(key1,value1,...,...) 创建map 键值对为i1:p1 i2:p2
      insert into NewMapStream;
      
      
      @info(name = 'Check-map')
      from NewMapStream
      select map:isMap(itemPriceMap) as isMap,
      
             map:containsKey(itemPriceMap, 'Cookie')
                  as isCookiePresent,
      
             map:containsValue(itemPriceMap, 24.0)
                  as isThereItemWithPrice24,
      
             map:isEmpty(itemPriceMap) as isEmpty,
      
             map:keys(itemPriceMap) as keys,
      
             map:size(itemPriceMap) as size
      insert into MapAnalysisStream;
      -- map:isMap(map) 判断是否是map
      -- map:containsKey(map,key) 判断有无key
      -- map:containsValue(map,value) 判断有没有这个值
      -- map:isEmpty(map) 判断map是否为空
      -- map:keys(map) 返回keyset
      -- map:size(map) 返回map的大小
      
      @info(name = 'Clone-and-update')
      from NewMapStream
      select map:replace(
                         map:put(map:clone(itemPriceMap),
                                 "Gift",
                                 1.0),
                         "Cake",
                         12.0) as itemPriceMap
      
      insert into ItemInsertedMapStream;
      -- map:replace(map,key,value) map中key的value替换为新的value 
      -- map:put(map,key,value) 在map中添加键值对,返回这个map
      -- map:clone(map) 返回一个克隆的map
      
      
      -- CoupleDealInfoStream 收到事件 ['Chocolate', 18.0, 'Ice Cream', 24.0]
      -- NewMapStream [{Ice Cream=24.0, Chocolate =18.0}]
      -- MapAnalysisStream [true, false, true, false, [Ice Cream, Chocolate], 2]
      -- ItemInsertedMapStream [{Ice Cream=12.0, Gift=1.0, Chocolate =18.0}]
      
  • List

    • define stream ProductComboStream (product1 string, product2 string, product3 string);
      
      @info(name = 'Create-list')
      from ProductComboStream
      select list:create(product1, product2, product3)
                  as productList
      
      insert into NewListStream;
      -- list:create(item1,item2,...) 
      @info(name = 'Check-list')
      from NewListStream
      select list:isList(productList) as isList,
      
             list:contains(productList, 'Cake')
                  as isCakePresent,
      
             list:isEmpty(productList) as isEmpty,
      
             list:get(productList, 1) as valueAt1,
      
             list:size(productList) as size
      
      insert into ListAnalysisStream;
      -- list:isList(list) 判断是否是list
      -- list:contains(list,item) 判断是否含有元素
      -- list:isEmpty(list) 判断是否为空
      -- list:get(list,index) 获取下标元素
      -- list:size(list) 返回list大小
      
      @info(name = 'Clone-and-update')
      from NewListStream
      select list:remove(
                  list:add(list:clone(productList), "Toffee"),
                  "Cake") as productList
      
      insert into UpdatedListStream;
      -- list:remove(list,item) 删除元素
      -- list:add(list,item) 添加元素,返回list
      -- list:clone(list),返回clone的list
      
      -- ProductComboStream 添加事件 ['Ice Cream', 'Chocolate', 'Cake']
      -- NewListStream [[Ice Cream, Chocolate, Cake]] 
      -- ListAnalysisStream [true, true, false, Chocolate, 3]
      -- UpdatedListStream [[Ice Cream, Chocolate, Toffee]]
      
  • Null

    • define stream ProductInputStream (item string, price double);
      define Table ProductInfoTable (item string, discount double);
      
      @info(name = 'Check-for-null')
      from ProductInputStream [not(item is null)]
      -- not(test) 返回表达式的结果true/false 
      -- [] 相当于if 判断条件
      -- object is null 返回是否为null
      select item,
             price is null as isPriceNull
      
      insert into ProductValidationStream;
      
      @info(name = 'Outer-join-with-table')
      from ProductInputStream as s
          left outer join ProductInfoTable as t
          on s.item == t.item
      select s.item, s.price, t.discount,
             math:power(t.discount, 2) is null
                  as isFunctionReturnsNull,
      
             t is null as isTNull,
             s is null as isSNull,
      
             t.discount is null as isTDiscountNull,
             s.item is null as isSItemNull
      
      insert into DiscountValidationStream;
      -- math:power(number,exp) 幂运算
      -- ProductInputStream ['Cake', 12.0]
      -- ProductValidationStream [Cake, false]
      -- DiscountValidationStream [Cake, 12.0, null, true, true, false, true, false]
      

siddhi-event&data

event cleansing

  • value based filtering

    • define stream TemperatureStream (
       sensorId string, temperature double);
      
      @info(name = 'EqualsFilter')
      from TemperatureStream[ sensorId == 'A1234']
      select *
      insert into SenorA1234TemperatureStream;
      -- [] 判断if
      
      @info(name = 'RangeFilter') 
      from TemperatureStream[ temperature > -2 and temperature < 40]
      select *
      insert into NormalTemperatureStream;
      -- > < and 
      
      @info(name = 'NullFilter') 
      from TemperatureStream[ sensorId is null ]
      
      select *
      insert into InValidTemperatureStream;
      -- is null
      
    • [] 判断if > < and is null

  • if-then-else

    • define stream TemperatureStream 
              (sensorId string, temperature double);
      
      @info(name = 'SimpleIfElseQuery')
      from TemperatureStream
      select sensorId,
       ifThenElse(temperature > -2, 'Valid', 'InValid') as isValid 
      -- ifThenElse(test,trueValue,falseValue) 三目表达式
      insert into ValidTemperatureStream;
      
      
      @info(name = 'ComplexIfElseQuery') 
      from TemperatureStream
      select sensorId, 
       ifThenElse(temperature > -2, 
              ifThenElse(temperature > 40, 'High', 'Normal'), 
              'InValid') 
          as tempStatus
      -- 嵌套三目表达式
      insert into ProcessedTemperatureStream;
      
    • ifThenElse(test,trueValue,falseValue) 三目表达式

  • regex matching

    • define stream SweetProductionStream (name string, amount int);
      
      @info(name='ProcessSweetProductionStream')
      from SweetProductionStream
      select name, 
         regex:matches('chocolate(.*)', name) as isAChocolateProduct, 
         regex:group('.*\s(.*)', name, 1) as sweetType
      -- regex:matches(regex,string) 判断string是否匹配模式,返回true/false
      -- regex:group(regex,string,index) string匹配regex后,划分为group,返回下标为index的元素
      -- '.*\s(.*)'任意字符 中间是空格 任意字符
      insert into ChocolateProductStream; 
      
      -- SweetProductionStream ['chocolate cake', 34]
      -- ChocolateProductStream ['chocolate cake', true, 'cake']
      
  • default

    • define stream PatientRegistrationInputStream (
                       seqNo long, name string, age int,
                       height float, weight double, photo object,
                       isEmployee bool, wardNo object);
      
      @info(name = 'SimpleIfElseQuery')
      from PatientRegistrationInputStream
      select 
       default(name, 'invalid') as name, 
       default(seqNo, 0l) as seqNo, 
       default(weight, 0d) as weight,
       default(age, 0) as age, 
       default(height, 0f) as height   
      -- default(param,defaultValue) 如果该值不存在,就为默认值
      insert into PreprocessedPatientRegistrationInputStream;
      -- 各类型的默认值 ['invalid', 0 0.0, 0, 0.0]
      
  • type based filtering

    • define stream SweetProductionStream (name string, amount int);
      
      @info(name='ProcessSweetProductionStream')
      from SweetProductionStream
      select 
         instanceOfInteger(amount) as isAIntInstance,
      
          name, 
          amount
      insert into ProcessedSweetProductionStream;
      -- 将判断类型的结果放入event作为过滤
      
  • remove duplicate event

    • define stream TemperatureStream
              (sensorId string, seqNo string, temperature double);
              
      @info(name = 'Deduplicate-sensorId')
      from TemperatureStream#unique:deduplicate(sensorId, 1 min)
      
      -- stream_name#unique:deduplicate(id,time_gap) 根据id去重time gap之间到达的event,超出gap的不算重复
      
      select *
      insert into UniqueSensorStream;
      @info(name = 'Deduplicate-sensorId-and-seqNo')
      from TemperatureStream#unique:deduplicate(
                              str:concat(sensorId,'-',seqNo), 1 min)
      -- str:concat(str1,concat_symbol,str2) 
      select *
      insert into UniqueSensorSeqNoStream;  
      

data transformation

  • math & logical operation

    • define stream TemperatureStream
              (sensorId string, temperature double);
      @info(name = 'celciusTemperature')
      from TemperatureStream
      select sensorId, 
                      (temperature * 9 / 5) + 32 as temperature
      
      insert into FahrenheitTemperatureStream;
      @info(name = 'Overall-analysis')
      from FahrenheitTemperatureStream
      select sensorId, 
                      math:floor(temperature) as approximateTemp 
      -- math:floor(number) 向下取整
      insert all events into OverallTemperatureStream;
      @info(name = 'RangeFilter') 
      from OverallTemperatureStream
                    [ approximateTemp > -2 and approximateTemp < 40]
      
      select *
      insert into NormalTemperatureStream;
      
  • transform json

    • define stream InputStream(jsonString string);
      
      from InputStream 
      select json:toObject(jsonString) as jsonObj 
      insert into PersonalDetails;
      -- json:toObject(string) json->object
      
      from PersonalDetails
      select jsonObj, 
          json:getString(jsonObj,'$.name') as name,
          json:isExists(jsonObj, '$.salary') as isSalaryAvailable,
          json:toString(jsonObj) as jsonString 
      -- json:getString(object,"$.item") 返回item名对应的值
      -- json:isExists(object,"$.item") 判断item名对应的值是否存在
      -- json:toString(object) 返回object对应的字符串
      insert into OutputStream;
      
      from OutputStream[isSalaryAvailable == false]
      select 
          json:setElement(jsonObj, '$', 0f, 'salary') as jsonObj
      -- json:setElement(object,'$',value,'item') 将item的值赋为value,添加到object
      insert into PreprocessedStream;
      
      {
          "name" : "siddhi.user",
          "address" : {
              "country": "Sri Lanka",
          },
          "contact": "+9xxxxxxxx"
      }
      
      OutputStream
      [ {"address":{"country":"Sri Lanka"},"contact":"+9xxxxxxxx","name":"siddhi.user"}, siddhi.user, false,
      "{\"name\" : \"siddhi.user\", \"address\" : { \"country\": \"Sri Lanka\", }, \"contact\": \"+9xxxxxxxx\"}"]
      
      PreprocessedStream
      {
          "name" : "siddhi.user",
          "salary": 0.0
          "address" : {
              "country": "Sri Lanka",
          },
          "contact": "+9xxxxxxxx"
      }
      

data summarization

  • sliding time

    • define stream TemperatureStream
              (sensorId string, temperature double);
      
      @info(name = 'Overall-analysis')
      from TemperatureStream#window.time(1 min)
      # window.time(time_gap) 滑动窗口的时间间隔是1分钟以内
      
      select avg(temperature) as avgTemperature,
             max(temperature) as maxTemperature,
             count() as numberOfEvents
      insert all events into OverallTemperatureStream;
      -- avg(number) 
      -- max(number)
      -- count() 
      -- insert all events into stream 在时间间隔内添加所有事件到stream中,并且过期后删除
      
      @info(name = 'SensorId-analysis')
      from TemperatureStream#window.time(30 sec)
      select sensorId,
             avg(temperature) as avgTemperature,
             min(temperature) as maxTemperature
      group by sensorId
      having avgTemperature > 20.0
      -- group by id  having 条件
      insert into SensorIdTemperatureStream;
      -- 筛选后加入到滑动窗口中
      
    • stream#method 可以看做是这个method限制了stream的行为,比如去重、滑动窗口聚合等。

  • batch (tumbling) time

    • define stream TemperatureStream
              (sensorId string, temperature double);
      
      @info(name = 'Overall-analysis')
      from TemperatureStream#window.timeBatch(1 min)
      select avg(temperature) as avgTemperature,
             max(temperature) as maxTemperature,
             count() as numberOfEvents
             
      -- stream#window.timeBatch(time_gap) 每time_gap时间视作一个batch,从第一个到达的event开始
      insert into OverallTemperatureStream;
      
      @info(name = 'SensorId-analysis')
      from TemperatureStream#window.timeBatch(30 sec, 0)
      -- stream#window.timeBatch(time_gap,start_timepoint) 每time_gap时间视作一个batch,从开始时刻开始
      select sensorId,
             avg(temperature) as avgTemperature,
             min(temperature) as maxTemperature
      group by sensorId
      having avgTemperature > 20.0
      insert into SensorIdTemperatureStream;
      
      
    • window:time()和window:timeBatch()的区别在于,前者是从时间间隔开始就开始写入stream,后者是时间间隔结束后写入stream。

  • sliding event count

    • define stream TemperatureStream
              (sensorId string, temperature double);
      
      @info(name = 'Overall-analysis')
      from TemperatureStream#window.length(4)
      -- stream#window.length(len) 滑动窗口受长度限制,聚合最后len个事件
      select avg(temperature) as avgTemperature,
             max(temperature) as maxTemperature,
             count() as numberOfEvents
      insert into OverallTemperatureStream;
      
      @info(name = 'SensorId-analysis')
      from TemperatureStream#window.length(5)
      select sensorId,
             avg(temperature) as avgTemperature,
             min(temperature) as maxTemperature
      group by sensorId
      having avgTemperature >= 20.0
      insert into SensorIdTemperatureStream;
      
  • batch(tumbling) event count

    • define stream TemperatureStream
              (sensorId string, temperature double);
      
      @info(name = 'Overall-analysis')
      from TemperatureStream#window.lengthBatch(4)
      -- stream#window.lengthBatch(len) 滑动窗口受长度限制,聚合最后len个事件为batch
      select avg(temperature) as avgTemperature,
             max(temperature) as maxTemperature,
             count() as numberOfEvents
      insert into OverallTemperatureStream;
      
      @info(name = 'SensorId-analysis')
      from TemperatureStream#window.lengthBatch(5)
      select sensorId,
             avg(temperature) as avgTemperature,
             min(temperature) as maxTemperature
      group by sensorId
      having avgTemperature >= 20.0
      insert into SensorIdTemperatureStream;
      
  • session

    • define stream PurchaseStream
              (userId string, item string, price double);
      
      @info(name = 'Session-analysis')
      from PurchaseStream#window.session(1 min, userId)
      # stream#window.session(time_gap,session_id) 通过id聚合事件,在time_gap时间内
      select userId,
             count() as totalItems,
             sum(price) as totalPrice
      group by userId
      insert into UserIdPurchaseStream;
      
      @info(name = 'Session-analysis-with-late-event-arrivals')
      from PurchaseStream#window.session(1 min, userId, 20 sec)
      -- 在基于 userId 的 sessionwindow 上聚合事件,会话间隔为 1 分钟,允许延迟 20 秒以捕获延迟到达。
      select userId,
             count() as totalItems,
             sum(price) as totalPrice
      group by userId
      insert into OutOfOrderUserIdPurchaseStream;
      
  • named window

    • define stream TemperatureStream
              (sensorId string, temperature double);
      
      define window OneMinTimeWindow
              (sensorId string, temperature double) time(1 min) ;
      -- define window name() time_type(time_gap)
      @info(name = 'Insert-to-window')
      from TemperatureStream
      insert into OneMinTimeWindow;
      
      @info(name = 'Min-max-analysis')
      from OneMinTimeWindow
      select min(temperature) as minTemperature,
             max(temperature) as maxTemperature
      insert into MinMaxTemperatureOver1MinStream;
      
      @info(name = 'Per-sensor-analysis')
      from OneMinTimeWindow
      select sensorId,
             avg(temperature) as avgTemperature
      group by sensorId
      insert into AvgTemperaturePerSensorStream;
      

data pipelining

  • stream join

    • define stream TemperatureStream
              (roomNo string, temperature double);
      
      define stream HumidityStream
              (roomNo string, humidity double);
      
      @info(name = 'Equi-join')
      from TemperatureStream#window.unique:time(roomNo, 1 min) as t
          join HumidityStream#window.unique:time(roomNo, 1 min) as h
          on t.roomNo == h.roomNo
          
      -- stream#window.unique:time(id,time_gap) 只留下时间间隔内唯一的事件
      -- stream join
      select t.roomNo, t.temperature, h.humidity
      insert into TemperatureHumidityStream;
      
      @info(name = 'Join-on-temperature')
      from TemperatureStream as t
          left outer join HumidityStream#window.time(1 min) as h
          on t.roomNo == h.roomNo
      -- stream t 左外连接 stream h 1分钟以内的事件
      select t.roomNo, t.temperature, h.humidity
      insert into EnrichedTemperatureStream;
      
  • partition events by value

    • define stream LoginStream
              ( userID string, loginSuccessful bool);
      
      @purge(enable='true', interval='10 sec',
             idle.period='1 hour')
      partition with ( userID of LoginStream )
      -- @purge 每10秒检查一下 1小时之内没有收到过事件的分区实例 并清除
      -- partition 按照userID分区
      begin
          @info(name='Aggregation-query')
          from LoginStream#window.length(3)
          select userID, loginSuccessful, count() as attempts
          group by loginSuccessful
          insert into #LoginAttempts;
      -- #LoginAttempts 是partition的内部stream,只能在内部访问 
      
          @info(name='Alert-query')
          from #LoginAttempts[loginSuccessful==false and attempts==3]
          select userID, "3 consecutive login failures!" as message
          insert into UserSuspensionStream;
      end;
      
      
  • scatter and gather(String)

    • define stream PurchaseStream
                      (userId string, items string, store string);
      
      @info(name = 'Scatter-query')
      from PurchaseStream#str:tokenize(items, ',', true)
      select userId, token as item, store
      insert into TokenizedItemStream;
      -- str:tokenize(items,seperator,boolean) items用分隔符连接
      
      @info(name = 'Transform-query')
      from TokenizedItemStream
      select userId, str:concat(store, "-", item) as itemKey
      insert into TransformedItemStream;
      -- str:concat()
      @info(name = 'Gather-query')
      from TransformedItemStream#window.batch()
      select userId, str:groupConcat(itemKey, ",") as itemKeys
      insert into GroupedPurchaseItemStream;
      -- str:groupConcat(items,seperator)  items用分隔符连接
      
  • scatter and gather(json)

    • define stream PurchaseStream
                      (order string, store string);
      
      @info(name = 'Scatter-query')
      from PurchaseStream#json:tokenize(order, '$.order.items')
      -- json:tokenize(json,'$.name') 访问name下的值,分隔开
      select json:getString(order, '$.order.id') as orderId, -- 拿出order.id
             jsonElement as item,
             store
      insert into TokenizedItemStream;
      
      @info(name = 'Transform-query')
      from TokenizedItemStream
      select orderId,
             ifThenElse(json:getString(item, 'name') == "cake",
                        json:toString(
                          json:setElement(item, 'price',
                            json:getDouble(item, 'price') - 5
                          )
                        ),
                        item) as item,
             store
      insert into DiscountedItemStream;
      
      @info(name = 'Gather-query')
      from DiscountedItemStream#window.batch()
      select orderId, json:group(item) as items, store
      insert into GroupedItemStream;
      -- json:group(item) 返回一个item json数组
      
      @info(name = 'Format-query')
      from GroupedItemStream
      select str:fillTemplate("""
          {"discountedOrder":
              {"id":"{{1}}", "store":"{{3}}", "items":{{2}} }
          }""", orderId, items, store) as discountedOrder
      insert into DiscountedOrderStream;
      -- str:fillTemplate({{n}}) 类似于format()的占位符 返回一个拼好的string
      

siddhi-others

  • simple pattern 简单模式 一个或多个事件随时间到达

    • define stream TemperatureStream(roomNo int, temp double);
      
      @sink(type = 'log')
      define Stream HighTempAlertStream(roomNo int,
          initialTemp double, finalTemp double);
      
      @info(name='temperature-increase-identifier')
      from every( e1 = TemperatureStream ) ->
          e2 = TemperatureStream[ e1.roomNo == roomNo
              and (e1.temp + 5) <= temp ]
          within 10 min
      select e1.roomNo, e1.temp as initialTemp, e2.temp as finalTemp
      insert into HighTempAlertStream;
      
  • count pattern 计数模式 匹配相同条件收到的多个事件

    • define stream TemperatureStream (sensorID long, roomNo int,
          temp double);
      
      define stream RegulatorStream (deviceID long, roomNo int,
          tempSet double, isOn bool);
      
      @sink(type = 'log')
      define stream TemperatureDiffStream(roomNo int,
          tempDiff double);
      
      from every( e1 = RegulatorStream)
          -> e2 = TemperatureStream[e1.roomNo == roomNo] < 1: >
          -> e3 = RegulatorStream[e1.roomNo == roomNo]
      select e1.roomNo, e2[0].temp - e2[last].temp as tempDiff
      insert into TemperatureDiffStream;
      
      
  • logical pattern 逻辑模式 匹配时间到达的顺序和逻辑关系

    • define stream RegulatorStateChangeStream(deviceID long,
          roomNo int, tempSet double, action string);
      
      define stream RoomKeyStream(deviceID long, roomNo int,
          action string);
      
      @sink(type='log')
      define stream RegulatorActionStream(roomNo int, action string);
      
      from every e1=RegulatorStateChangeStream[ action == 'on' ]
           -> e2=RoomKeyStream
                  [ e1.roomNo == roomNo and action == 'removed' ]
              or e3=RegulatorStateChangeStream
                  [ e1.roomNo == roomNo and action == 'off']
      select e1.roomNo,
          ifThenElse( e2 is null, 'none', 'stop' ) as action
      having action != 'none'
      insert into RegulatorActionStream;
      
      
  • non-concurrence pattern 非发生模式 检测事件的发生和过期

    • define stream RegulatorStateChangeStream(deviceID long,
          roomNo int, tempSet double, action string);
      
      define stream TemperatureStream (roomNo int, temp double);
      
      @sink(type='log')
      define stream RoomTemperatureAlertStream(roomNo int);
      
      from e1=RegulatorStateChangeStream[action == 'on']
           -> not TemperatureStream[e1.roomNo == roomNo and
              temp <= e1.tempSet] for 30 sec
      select e1.roomNo as roomNo
      insert into RoomTemperatureAlertStream;
      
      
  • simple sequence 简单序列 匹配按时间到达的连续事件

    • define stream StockRateStream (symbol string, price float,
          volume int);
      
      @sink(type='log')
      define stream PeakStockRateStream (symbol string,
          rateAtPeak float);
      
      partition with (symbol of StockRateStream)
      begin
      
          from every e1=StockRateStream,
              e2=StockRateStream[e1.price < price],
              e3=StockRateStream[e2.price > price]
              within 10 min
          select e1.symbol, e2.price as rateAtPeak
          insert into PeakStockRateStream ;
      end;
      
      
  • sequence with count 按计数序列排序 可以计数的序列

    • define stream TemperatureStream(roomNo int, temp double);
      
      @sink(type='log') 
      define stream PeekTemperatureStream(roomNo int,
          initialTemp double, peekTemp double, firstDropTemp double);
      
      partition with (roomNo of TemperatureStream)
      begin
          @info(name = 'temperature-trend-analyzer')
          from every e1=TemperatureStream,
               e2=TemperatureStream[ifThenElse(e2[last].temp is null,
                      e1.temp <= temp, e2[last].temp <= temp)]+,
               e3=TemperatureStream[e2[last].temp > temp]
          select e1.roomNo, e1.temp as initialTemp,
              e2[last].temp as peekTemp, e3.temp as firstDropTemp
      
          insert into PeekTemperatureStream ;
      end;
      
      
  • logical sequence 逻辑序列 匹配重复事件序列 并用逻辑排序

    • define stream TempSensorStream(deviceID long,
          isActive bool);
      
      define stream HumidSensorStream(deviceID long,
          isActive bool);
      
      define stream RegulatorStream(deviceID long, isOn bool);
      
      @sink(type='log')
      define stream StateNotificationStream (deviceID long,
          tempSensorActive bool, humidSensorActive bool);
      
      from every e1=RegulatorStream[isOn == true],
          e2=TempSensorStream and e3=HumidSensorStream
      select e1.deviceID, e2.isActive as tempSensorActive,
          e3.isActive as humidSensorActive
      insert into StateNotificationStream;
      

service integration

  • HTTP Service Integration

    • @sink(type='http-call',
          publisher.url='http://localhost:8005/validate-loan',
          method='POST', sink.id='loan-validation',
          @map(type='json'))
      define stream LoanValidationStream (clientId long,
                          name string, amount double);
      -- 生产者 http-call post方式在这个URL上广播json数据
      @source(type='http-call-response', sink.id='loan-validation',
          http.status.code='2\d+',
          @map(type='json', @attributes(customerName='trp:name',
              clientId='trp:clientId', loanAmount='trp:amount',
              interestRate='validation-response.rate',
              totalYears='validation-response.years-approved')))
      define stream SuccessLoanRequestStream(clientId long,
                 customerName string, loanAmount double,
                 interestRate double, totalYears int);
      -- 消费者 http-call-response 只满足状态码是2开头的情况
      @source(type='http-call-response', sink.id='loan-validation',
          http.status.code='400',
          @map(type='json', @attributes(customerName='trp:name',
              clientId='trp:clientId',
              failureReason='validation-response.reason')))
      define stream FailureLoanRequestStream(clientId long,
                      customerName string, failureReason string);
      -- 消费者 http-call-response 只满足状态码是400的情况
      define stream LoanRequestStream (clientId long, name string,
                      amount double);
      
      @sink(type='log') 
      define stream LoanResponseStream(clientId long,
                      customerName string, message string);
      -- 生产者 log
      @info(name = 'attribute-projection')
      from LoanRequestStream
      select clientId, name, amount
      insert into LoanValidationStream;
      
      @info(name = 'successful-message-generator')
      from SuccessLoanRequestStream
      select clientId, customerName,
          "Loan Request is accepted for processing" as message
      insert into LoanResponseStream;
      
      @info(name = 'failure-message-generator')
      from FailureLoanRequestStream
      select clientId, customerName,
              str:concat("Loan Request is rejected due to ",
                  failureReason) as message
      insert into LoanResponseStream;
      
    • 分别定义了http响应情况是正常和400的情况的消费者,通过json验证,结果以log的形式广播

  • gRPC service integration

    • define stream TicketBookingStream (name string, phoneNo string,
              movie string, ticketClass string, qty int,
              bookingTime long);
      
      @sink(type='grpc-call',
          publisher.url =
          'grpc://localhost:5003/org.wso2.grpc.EventService/process',
          sink.id= 'ticket-price', @map(type='json'))
      define stream TicketPriceFinderStream (name string,
              phoneNo string, movie string, ticketClass string,
              qty int, bookingTime long);
      -- 生产者
      @source(type='grpc-call-response',
          receiver.url =
          'grpc://localhost:9763/org.wso2.grpc.EventService/process',
          sink.id= 'ticket-price',
          @map(type='json', @attributes(customerName='trp:name',
              phoneNo='trp:phoneNo', movie='trp:movie',
              qty='trp:qty', bookingTime='trp:bookingTime',
              ticketPrice='price')))
      -- 消费者
      define stream TicketPriceResponseStream (customerName string,
              phoneNo string, movie string, qty int,
              ticketPrice double, bookingTime long);
      
      @sink(type='log')
      define stream TotalTicketPaymentStream (customerName string,
              phoneNo string, movie string, totalAmount double,
              bookingTime long);
      -- log 生产者
      @info(name = 'filter-basic-ticket-bookings')
      from TicketBookingStream[ticketClass == "BASIC"]
      select name as customerName, phoneNo, movie,
          qty * 20.0 as totalAmount, bookingTime
      insert into TotalTicketPaymentStream;
      
      @info(name = 'filter-non-basic-tickets')
      from TicketBookingStream[ticketClass != "BASIC"]
      select *
      insert into TicketPriceFinderStream;
      
      @info(name = 'total-price-calculator')
      from TicketPriceResponseStream
      select customerName, phoneNo, movie,
          (qty * ticketPrice) as totalAmount, bookingTime
      insert into TotalTicketPaymentStream;
      
      

rate limiting

  • rate limit based on time

    • define stream APIRequestStream (apiName string, version string,
          tier string, user string, userEmail string);
      define stream UserNotificationStream (user string,
          apiName string, version string, tier string,
          userEmail string, throttledCount long);
      @info(name='api-throttler')
      from APIRequestStream#window.timeBatch(1 min, 0, true) 
      select apiName, version, user, tier, userEmail,
          count() as totalRequestCount
      group by apiName, version, user
      having totalRequestCount == 3 or totalRequestCount == 0
      insert all events into ThrottledStream;
      
      @info(name='throttle-flag-generator') 
      from ThrottledStream 
      select apiName, version, user, tier, userEmail,
          ifThenElse(totalRequestCount == 0, false, true)
              as isThrottled
      insert into ThrottleOutputStream;
      
      @info(name='notification-generator') 
      from ThrottleOutputStream[isThrottled]#window.time(1 hour) 
      select user, apiName, version, tier, userEmail,
          count() as throttledCount
      group by user, apiName, version, tier
      having throttledCount > 2
      output first every 15 min 
      insert into UserNotificationStream;
      

error handling

  • logging

    • define stream GlucoseReadingStream (locationRoom string,
          locationBed string, timeStamp string, sensorID long,
          patientFirstName string, patientLastName string,
          sensorValue double);
      
      @sink(type = 'http', on.error='log',
          publisher.url = "http://localhost:8080/logger",
          method = "POST",
          @map(type = 'json'))
      -- log 报错
      define stream AbnormalGlucoseReadingStream
          (timeStampInLong long, locationRoom string,
          locationBed string, sensorID long,
          patientFullName string, sensorReadingValue double);
      
      @info(name='abnormal-reading-identifier')
      from GlucoseReadingStream[sensorValue > 220]
      select math:parseLong(timeStamp) as timeStampInLong,
          locationRoom, locationBed, sensorID,
          str:concat(patientFirstName, " ", patientLastName)
              as patientFullName,
              sensorValue as sensorReadingValue
      insert into AbnormalGlucoseReadingStream;
      
  • wait&retry

    • define stream GlucoseReadingStream (locationRoom string,
          locationBed string, timeStamp string, sensorID long,
          patientFirstName string, patientLastName string,
          sensorValue double);
      
      @sink(type = 'http', on.error='wait',
          publisher.url = "http://localhost:8080/logger",
          method = "POST",
          @map(type = 'json'))
      -- 遇到错误等待 退出并重试 
      -- on.error 配置处理错误的解决方法
      define stream AbnormalGlucoseReadingStream
          (timeStampInLong long, locationRoom string,
          locationBed string, sensorID long,
          patientFullName string, sensorReadingValue double);
      
      @info(name='abnormal-reading-identifier')
      from GlucoseReadingStream[sensorValue > 220]
      select math:parseLong(timeStamp) as timeStampInLong,
          locationRoom, locationBed, sensorID,
          str:concat(patientFirstName, " ", patientLastName)
              as patientFullName,
              sensorValue as sensorReadingValue
      insert into AbnormalGlucoseReadingStream;
      

热门相关:最强狂兵   学霸女神超给力   最强装逼打脸系统   横行霸道   名门天后:重生国民千金