最近在跑flink社区1.15版本使用json_value函数时,发现其性能很差,通过jstack查看堆栈经常在执行以下堆栈
专注于为中小企业提供成都网站设计、成都网站制作服务,电脑端+手机端+微信端的三站合一,更高效的管理,为中小企业霍山免费做网站提供优质的服务。我们立足成都,凝聚了一批互联网行业人才,有力地推动了上1000家企业的稳健成长,帮助中小企业通过网站建设实现规模扩充和转变。可以看到这里的逻辑是在等锁,查看jsonpath的LRUCache
// | |
// Source code recreated from a .class file by IntelliJ IDEA | |
// (powered by FernFlower decompiler) | |
// | |
package org.apache.flink.table.shaded.com.jayway.jsonpath.spi.cache; | |
import java.util.Deque; | |
import java.util.LinkedList; | |
import java.util.Map; | |
import java.util.concurrent.ConcurrentHashMap; | |
import java.util.concurrent.locks.ReentrantLock; | |
import org.apache.flink.table.shaded.com.jayway.jsonpath.JsonPath; | |
public class LRUCache implements Cache { | |
private final ReentrantLock lock = new ReentrantLock(); | |
private final Map | |
private final Deque | |
private final int limit; | |
public LRUCache(int limit) { | |
this.limit = limit; | |
} | |
public void put(String key, JsonPath value) { | |
JsonPath oldValue = (JsonPath)this.map.put(key, value); | |
if (oldValue != null) { | |
this.removeThenAddKey(key); | |
} else { | |
this.addKey(key); | |
} | |
if (this.map.size() >this.limit) { | |
this.map.remove(this.removeLast()); | |
} | |
} | |
public JsonPath get(String key) { | |
JsonPath jsonPath = (JsonPath)this.map.get(key); | |
if (jsonPath != null) { | |
this.removeThenAddKey(key); | |
} | |
return jsonPath; | |
} | |
private void addKey(String key) { | |
this.lock.lock(); | |
try { | |
this.queue.addFirst(key); | |
} finally { | |
this.lock.unlock(); | |
} | |
} | |
private String removeLast() { | |
this.lock.lock(); | |
String var2; | |
try { | |
String removedKey = (String)this.queue.removeLast(); | |
var2 = removedKey; | |
} finally { | |
this.lock.unlock(); | |
} | |
return var2; | |
} | |
private void removeThenAddKey(String key) { | |
this.lock.lock(); | |
try { | |
this.queue.removeFirstOccurrence(key); | |
this.queue.addFirst(key); | |
} finally { | |
this.lock.unlock(); | |
} | |
} | |
private void removeFirstOccurrence(String key) { | |
this.lock.lock(); | |
try { | |
this.queue.removeFirstOccurrence(key); | |
} finally { | |
this.lock.unlock(); | |
} | |
} | |
... | |
} | |
可以看到get操作时,如果获取到的是有值的,那么会更新相应key的数据从双端队列移到首位,借此来实现LRU的功能,但是这样每次get和put操作都是需要加锁的,因此并发情况下吞吐就会比较低,也会导致cpu使用效率较低。
从jsonpath社区查看相应的问题,也有相关的反馈
Performance bottlenecks · Issue #740 · json-path/JsonPath · GitHub
Fix JSONPath cache inefficient issue by Ferrari6 · Pull Request #7409 · apache/pinot · GitHub
比较方便的是,jsonpath 提供了spi的方式可以自定义的设置Cache的实现类,可以通过以下方式来设置新的cache实现。
static { | |
CacheProvider.setCache(new JsonPathCache()); | |
} |
从pinot的实现中,我们看到他是用了guava的cache来替换了默认的LRUCache实现,那么这样实现性能优化有多少呢,这里我们是用java的性能测试框架jmh来测试下性能提升的情况
性能测试这里为了方便,直接在flink-benchmark工程里添加了两个benchmark的测试类.
GuavaCache
LRUCache
这里面需要注意,因为cache是进程级别共享的,所以我们需要将设置@State(Benchmark)
级别,这样我们构建的cache就是进程级别共享,而不是线程级别共享的。
写的测试是4个线程运行,缓存大小均为400
为了避免在本机运行时受本机的其他程序影响,最好是build jar之后放到服务器上跑
java -jar target/benchmarks.jar -rf csv org.apache.flink.benchmark.GuavaCacheBenchmark |
得到一个测试结果
Benchmark Mode Cnt Score Error Units | |
GuavaCacheBenchmark.get thrpt 30 4480.563 ± 203.311 ops/ms | |
GuavaCacheBenchmark.put thrpt 30 1774.769 ± 119.198 ops/ms | |
LRUCacheBenchmark.get thrpt 30 441.239 ± 2.812 ops/ms | |
LRUCacheBenchmark.put thrpt 30 350.549 ± 12.285 ops/ms |
可以看到使用guava的cache后,get性能提升8倍左右,put性能提升5倍左右。
这块性能提升的主要来源是cache的实现机制上,和caffeine 的作者在github上也简单了解了下相关的推荐实现
后面会写一篇文章来专门分析下caffeine cache的优化实现。
你是否还在寻找稳定的海外服务器提供商?创新互联www.cdcxhl.cn海外机房具备T级流量清洗系统配攻击溯源,准确流量调度确保服务器高可用性,企业级服务器适合批量采购,新人活动首月15元起,快前往官网查看详情吧