Storm中如何进行Librato的Metric度量的实现,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。
成都网站建设哪家好,找创新互联公司!专注于网页设计、成都网站建设、微信开发、微信小程序开发、集团成都定制网站等服务项目。核心团队均拥有互联网行业多年经验,服务众多知名企业客户;涵盖的客户类型包括:玻璃贴膜等众多领域,积累了大量丰富的经验,同时也获得了客户的一致认可!
辐射性质介绍一个Librato的Metric度量的实现
package com.digitalpebble.storm.crawler; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.apache.commons.lang.StringUtils; import org.codehaus.jackson.map.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import backtype.storm.metric.api.IMetricsConsumer; import backtype.storm.task.IErrorReporter; import backtype.storm.task.TopologyContext; import com.librato.metrics.HttpPoster; import com.librato.metrics.HttpPoster.Response; import com.librato.metrics.LibratoBatch; import com.librato.metrics.NingHttpPoster; import com.librato.metrics.Sanitizer; import com.librato.metrics.Versions; /** Sends the metrics to Librato **/ public class LibratoMetricsConsumer implements IMetricsConsumer { public static final int DEFAULT_BATCH_SIZE = 500; private static final Logger LOG = LoggerFactory .getLogger(LibratoMetricsConsumer.class); private static final String LIB_VERSION = Versions.getVersion( "META-INF/maven/com.librato.metrics/librato-java/pom.properties", LibratoBatch.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private final Sanitizer sanitizer = new Sanitizer() { public String apply(String name) { return Sanitizer.LAST_PASS.apply(name); } }; private int postBatchSize = DEFAULT_BATCH_SIZE; private long timeout = 30; private final TimeUnit timeoutUnit = TimeUnit.SECONDS; private String userAgent = null; private HttpPoster httpPoster; private SetmetricsToKeep = new HashSet (); public void prepare(Map stormConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter) { // TODO configure timeouts // this.timeout = timeout; // this.timeoutUnit = timeoutUnit; // this.postBatchSize = postBatchSize; String agentIdentifier = (String) stormConf.get("librato.agent"); if (agentIdentifier == null) agentIdentifier = "storm"; String token = (String) stormConf.get("librato.token"); String username = (String) stormConf.get("librato.username"); String apiUrl = (String) stormConf.get("librato.api.url"); if (apiUrl == null) apiUrl = "https://metrics-api.librato.com/v1/metrics"; // check that the values are not null if (StringUtils.isBlank(token)) throw new RuntimeException("librato.token not set"); if (StringUtils.isBlank(username)) throw new RuntimeException("librato.username not set"); this.userAgent = String.format("%s librato-java/%s", agentIdentifier, LIB_VERSION); this.httpPoster = NingHttpPoster.newPoster(username, token, apiUrl); // get the list of metrics names to keep if any String metrics2keep = (String) stormConf.get("librato.metrics.to.keep"); if (metrics2keep != null) { String[] mets = metrics2keep.split(","); for (String m : mets) metricsToKeep.add(m.trim().toLowerCase()); } } // post(String source, long epoch) public void handleDataPoints(TaskInfo taskInfo, Collection dataPoints) { final Map payloadMap = new HashMap (); payloadMap.put("source", taskInfo.srcComponentId + "_" + taskInfo.srcWorkerHost + "_" + taskInfo.srcTaskId); payloadMap.put("measure_time", taskInfo.timestamp); final List
看完上述内容是否对您有帮助呢?如果还想对相关知识有进一步的了解或阅读更多相关文章,请关注创新互联行业资讯频道,感谢您对创新互联的支持。