聊聊skywalking的JVMService
2020/2/24 17:03:06
本文主要是介绍聊聊skywalking的JVMService,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
序
本文主要研究一下skywalking的JVMService
BootService
skywalking-6.6.0/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/boot/BootService.java
public interface BootService { void prepare() throws Throwable; void boot() throws Throwable; void onComplete() throws Throwable; void shutdown() throws Throwable; } 复制代码
- BootService定义了prepare、boot、onComplete、shutdown四个方法
JVMService
skywalking-6.6.0/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/jvm/JVMService.java
@DefaultImplementor public class JVMService implements BootService, Runnable { private static final ILog logger = LogManager.getLogger(JVMService.class); private LinkedBlockingQueue<JVMMetric> queue; private volatile ScheduledFuture<?> collectMetricFuture; private volatile ScheduledFuture<?> sendMetricFuture; private Sender sender; @Override public void prepare() throws Throwable { queue = new LinkedBlockingQueue<JVMMetric>(Config.Jvm.BUFFER_SIZE); sender = new Sender(); ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(sender); } @Override public void boot() throws Throwable { collectMetricFuture = Executors .newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("JVMService-produce")) .scheduleAtFixedRate(new RunnableWithExceptionProtection(this, new RunnableWithExceptionProtection.CallbackWhenException() { @Override public void handle(Throwable t) { logger.error("JVMService produces metrics failure.", t); } }), 0, 1, TimeUnit.SECONDS); sendMetricFuture = Executors .newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("JVMService-consume")) .scheduleAtFixedRate(new RunnableWithExceptionProtection(sender, new RunnableWithExceptionProtection.CallbackWhenException() { @Override public void handle(Throwable t) { logger.error("JVMService consumes and upload failure.", t); } } ), 0, 1, TimeUnit.SECONDS); } @Override public void onComplete() throws Throwable { } @Override public void shutdown() throws Throwable { collectMetricFuture.cancel(true); sendMetricFuture.cancel(true); } @Override public void run() { if (RemoteDownstreamConfig.Agent.SERVICE_ID != DictionaryUtil.nullValue() && RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID != DictionaryUtil.nullValue() ) { long currentTimeMillis = System.currentTimeMillis(); try { JVMMetric.Builder jvmBuilder = JVMMetric.newBuilder(); jvmBuilder.setTime(currentTimeMillis); jvmBuilder.setCpu(CPUProvider.INSTANCE.getCpuMetric()); jvmBuilder.addAllMemory(MemoryProvider.INSTANCE.getMemoryMetricList()); jvmBuilder.addAllMemoryPool(MemoryPoolProvider.INSTANCE.getMemoryPoolMetricsList()); jvmBuilder.addAllGc(GCProvider.INSTANCE.getGCList()); JVMMetric jvmMetric = jvmBuilder.build(); if (!queue.offer(jvmMetric)) { queue.poll(); queue.offer(jvmMetric); } } catch (Exception e) { logger.error(e, "Collect JVM info fail."); } } } //...... } 复制代码
- JVMService实现了BootService及Runnable接口,其prepare方法创建了JVMMetric类型的LinkedBlockingQueue,以及Sender,并往GRPCChannelManager添加了该Sender;其boot方法注册了collectMetricFuture、sendMetricFuture两个调度任务,调度间隔都是1秒;其shutdown方法执行collectMetricFuture.cancel(true)及sendMetricFuture.cancel(true);其run方法主要是构建JVMMetric然后添加到queue中
Sender
skywalking-6.6.0/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/jvm/JVMService.java
private class Sender implements Runnable, GRPCChannelListener { private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT; private volatile JVMMetricReportServiceGrpc.JVMMetricReportServiceBlockingStub stub = null; @Override public void run() { if (RemoteDownstreamConfig.Agent.SERVICE_ID != DictionaryUtil.nullValue() && RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID != DictionaryUtil.nullValue() ) { if (status == GRPCChannelStatus.CONNECTED) { try { JVMMetricCollection.Builder builder = JVMMetricCollection.newBuilder(); LinkedList<JVMMetric> buffer = new LinkedList<JVMMetric>(); queue.drainTo(buffer); if (buffer.size() > 0) { builder.addAllMetrics(buffer); builder.setServiceInstanceId(RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID); Commands commands = stub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS).collect(builder.build()); ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands); } } catch (Throwable t) { logger.error(t, "send JVM metrics to Collector fail."); } } } } @Override public void statusChanged(GRPCChannelStatus status) { if (GRPCChannelStatus.CONNECTED.equals(status)) { Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel(); stub = JVMMetricReportServiceGrpc.newBlockingStub(channel); } this.status = status; } } 复制代码
- Sender实现了Runnable, GRPCChannelListener接口,其run方法在status为GRPCChannelStatus.CONNECTED时执行queue.drainTo(buffer),在buffer不为空时构造commands,然后通过ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands)发送JVMMetric;statusChanged方法会设置status,并在GRPCChannelStatus.CONNECTED.equals(status)时通过JVMMetricReportServiceGrpc.newBlockingStub(channel)设置stub
RunnableWithExceptionProtection
skywalking-6.6.0/apm-commons/apm-util/src/main/java/org/apache/skywalking/apm/util/RunnableWithExceptionProtection.java
public class RunnableWithExceptionProtection implements Runnable { private Runnable run; private CallbackWhenException callback; public RunnableWithExceptionProtection(Runnable run, CallbackWhenException callback) { this.run = run; this.callback = callback; } @Override public void run() { try { run.run(); } catch (Throwable t) { callback.handle(t); } } public interface CallbackWhenException { void handle(Throwable t); } } 复制代码
- RunnableWithExceptionProtection实现了Runnable接口,其构造器要求Runnable及CallbackWhenException参数,其run方法会catch住run.run()的Throwable异常,然后回调CallbackWhenException的handle方法
小结
JVMService实现了BootService及Runnable接口,其prepare方法创建了JVMMetric类型的LinkedBlockingQueue,以及Sender,并往GRPCChannelManager添加了该Sender;其boot方法注册了collectMetricFuture、sendMetricFuture两个调度任务,调度间隔都是1秒;其shutdown方法执行collectMetricFuture.cancel(true)及sendMetricFuture.cancel(true);其run方法主要是构建JVMMetric然后添加到queue中
doc
这篇关于聊聊skywalking的JVMService的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-10-01基于Python+Vue开发的医院门诊预约挂号系统
- 2024-10-01基于Python+Vue开发的旅游景区管理系统
- 2024-10-01RestfulAPI入门指南:打造简单易懂的API接口
- 2024-10-01初学者指南:了解和使用Server Action
- 2024-10-01Server Component入门指南:搭建与配置详解
- 2024-10-01React 中使用 useRequest 实现数据请求
- 2024-10-01使用 golang 将ETH账户的资产平均分散到其他账户
- 2024-10-01JWT用户校验课程:从入门到实践
- 2024-10-01Server Component课程入门指南
- 2024-09-30Dnd-Kit学习:新手快速入门指南