#眉標=雲端運算 #副標=開放原始碼的雲端運算平台技術(10) #大標=Pig實戰–Twitter背後的資料分析機制 #作者=文/圖 沈炳宏 ========程式1 =============== insert into summary select ip, count(*) from app join ( select name, email from users join clicks on (users.name = clicks.user) where value > 0; ) using email group by ip; ======================= ========程式2 =============== Users= load 'users' as (name, email); Clicks= load 'clicks' as (user, url, value); RealClicks= filter Clicks by value > 0; UserClicks= join Users by name, RealClicks by user; summary= foreach UserClicks generate group; store summary into ‘summary’; ======================= ========程式3 =============== public Integer exec(Tuple input) throws IOException { DataBag actions = (DataBag)input.get(0); DataBag timestamps = (DataBag)input.get(1); if(actions.size() == 0 || timestamps.size() == 0) { return null; } Iterator aIter = actions.iterator(); Iterator tIter = timestamps.iterator(); long lastTimestamp = 0; int countedSteps = 0; while (aIter.hasNext() && tIter.hasNext()) { String action = (String)aIter.next().get(0); Long timestamp = (Long)tIter.next().get(0); if (timestamp > lastTimestamp + sessionTimeout) { countedSteps = 0; } lastTimestamp = timestamp; if (action.equals("purchase")) return countedSteps; else countedSteps++; } return null; } ======================= ========程式4 =============== register SessionAnalysis.jar; views = load 'views' as (name:chararray, timestamp:long, action:chararray, purchase:double); grouped = group views by name parallel 4; sessions = foreach grouped { sorted = order views by timestamp; generate group, SessionAnalysis( sorted.action, sorted.timestamp) as steps; } bysteps = group sessions by steps; results = foreach bysteps generate group, COUNT(sessions); store results into 'results/session'; =======================