聊聊skywalking的TraceSegmentServiceClient
2020/3/3 17:01:34
本文主要是介绍聊聊skywalking的TraceSegmentServiceClient,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
序
本文主要研究一下skywalking的TraceSegmentServiceClient
TracingContextListener
skywalking-6.6.0/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/TracingContextListener.java
public interface TracingContextListener { void afterFinished(TraceSegment traceSegment); } 复制代码
- TracingContextListener定义了afterFinished方法,其参数为TraceSegment
TraceSegment
skywalking-6.6.0/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/TraceSegment.java
public class TraceSegment { private ID traceSegmentId; private List<TraceSegmentRef> refs; private List<AbstractTracingSpan> spans; private DistributedTraceIds relatedGlobalTraces; private boolean ignore = false; private boolean isSizeLimited = false; private final long createTime; public TraceSegment() { this.traceSegmentId = GlobalIdGenerator.generate(); this.spans = new LinkedList<AbstractTracingSpan>(); this.relatedGlobalTraces = new DistributedTraceIds(); this.relatedGlobalTraces.append(new NewDistributedTraceId()); this.createTime = System.currentTimeMillis(); } public void ref(TraceSegmentRef refSegment) { if (refs == null) { refs = new LinkedList<TraceSegmentRef>(); } if (!refs.contains(refSegment)) { refs.add(refSegment); } } public void relatedGlobalTraces(DistributedTraceId distributedTraceId) { relatedGlobalTraces.append(distributedTraceId); } public void archive(AbstractTracingSpan finishedSpan) { spans.add(finishedSpan); } public TraceSegment finish(boolean isSizeLimited) { this.isSizeLimited = isSizeLimited; return this; } public ID getTraceSegmentId() { return traceSegmentId; } public int getServiceId() { return RemoteDownstreamConfig.Agent.SERVICE_ID; } public boolean hasRef() { return !(refs == null || refs.size() == 0); } public List<TraceSegmentRef> getRefs() { return refs; } public List<DistributedTraceId> getRelatedGlobalTraces() { return relatedGlobalTraces.getRelatedGlobalTraces(); } public boolean isSingleSpanSegment() { return this.spans != null && this.spans.size() == 1; } public boolean isIgnore() { return ignore; } public void setIgnore(boolean ignore) { this.ignore = ignore; } public UpstreamSegment transform() { UpstreamSegment.Builder upstreamBuilder = UpstreamSegment.newBuilder(); for (DistributedTraceId distributedTraceId : getRelatedGlobalTraces()) { upstreamBuilder = upstreamBuilder.addGlobalTraceIds(distributedTraceId.toUniqueId()); } SegmentObject.Builder traceSegmentBuilder = SegmentObject.newBuilder(); /** * Trace Segment */ traceSegmentBuilder.setTraceSegmentId(this.traceSegmentId.transform()); // Don't serialize TraceSegmentReference // SpanObject for (AbstractTracingSpan span : this.spans) { traceSegmentBuilder.addSpans(span.transform()); } traceSegmentBuilder.setServiceId(RemoteDownstreamConfig.Agent.SERVICE_ID); traceSegmentBuilder.setServiceInstanceId(RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID); traceSegmentBuilder.setIsSizeLimited(this.isSizeLimited); upstreamBuilder.setSegment(traceSegmentBuilder.build().toByteString()); return upstreamBuilder.build(); } @Override public String toString() { return "TraceSegment{" + "traceSegmentId='" + traceSegmentId + '\'' + ", refs=" + refs + ", spans=" + spans + ", relatedGlobalTraces=" + relatedGlobalTraces + '}'; } public int getApplicationInstanceId() { return RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID; } public long createTime() { return this.createTime; } } 复制代码
- TraceSegment定义了traceSegmentId、refs、spans、relatedGlobalTraces等属性;它提供了ref、relatedGlobalTraces、archive 、finish、transform等方法
IConsumer
skywalking-6.6.0/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/IConsumer.java
public interface IConsumer<T> { void init(); void consume(List<T> data); void onError(List<T> data, Throwable t); void onExit(); } 复制代码
- IConsumer定义了init、consume、onError、onExit方法
TraceSegmentServiceClient
skywalking-6.6.0/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java
@DefaultImplementor public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSegment>, TracingContextListener, GRPCChannelListener { private static final ILog logger = LogManager.getLogger(TraceSegmentServiceClient.class); private static final int TIMEOUT = 30 * 1000; private long lastLogTime; private long segmentUplinkedCounter; private long segmentAbandonedCounter; private volatile DataCarrier<TraceSegment> carrier; private volatile TraceSegmentReportServiceGrpc.TraceSegmentReportServiceStub serviceStub; private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT; @Override public void prepare() throws Throwable { ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(this); } @Override public void boot() throws Throwable { lastLogTime = System.currentTimeMillis(); segmentUplinkedCounter = 0; segmentAbandonedCounter = 0; carrier = new DataCarrier<TraceSegment>(CHANNEL_SIZE, BUFFER_SIZE); carrier.setBufferStrategy(BufferStrategy.IF_POSSIBLE); carrier.consume(this, 1); } @Override public void onComplete() throws Throwable { TracingContext.ListenerManager.add(this); } @Override public void shutdown() throws Throwable { TracingContext.ListenerManager.remove(this); carrier.shutdownConsumers(); } @Override public void init() { } @Override public void consume(List<TraceSegment> data) { if (CONNECTED.equals(status)) { final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false); StreamObserver<UpstreamSegment> upstreamSegmentStreamObserver = serviceStub.withDeadlineAfter(Config.Collector.GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS).collect(new StreamObserver<Commands>() { @Override public void onNext(Commands commands) { ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands); } @Override public void onError(Throwable throwable) { status.finished(); if (logger.isErrorEnable()) { logger.error(throwable, "Send UpstreamSegment to collector fail with a grpc internal exception."); } ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(throwable); } @Override public void onCompleted() { status.finished(); } }); try { for (TraceSegment segment : data) { UpstreamSegment upstreamSegment = segment.transform(); upstreamSegmentStreamObserver.onNext(upstreamSegment); } } catch (Throwable t) { logger.error(t, "Transform and send UpstreamSegment to collector fail."); } upstreamSegmentStreamObserver.onCompleted(); status.wait4Finish(); segmentUplinkedCounter += data.size(); } else { segmentAbandonedCounter += data.size(); } printUplinkStatus(); } private void printUplinkStatus() { long currentTimeMillis = System.currentTimeMillis(); if (currentTimeMillis - lastLogTime > 30 * 1000) { lastLogTime = currentTimeMillis; if (segmentUplinkedCounter > 0) { logger.debug("{} trace segments have been sent to collector.", segmentUplinkedCounter); segmentUplinkedCounter = 0; } if (segmentAbandonedCounter > 0) { logger.debug("{} trace segments have been abandoned, cause by no available channel.", segmentAbandonedCounter); segmentAbandonedCounter = 0; } } } @Override public void onError(List<TraceSegment> data, Throwable t) { logger.error(t, "Try to send {} trace segments to collector, with unexpected exception.", data.size()); } @Override public void onExit() { } @Override public void afterFinished(TraceSegment traceSegment) { if (traceSegment.isIgnore()) { return; } if (!carrier.produce(traceSegment)) { if (logger.isDebugEnable()) { logger.debug("One trace segment has been abandoned, cause by buffer is full."); } } } @Override public void statusChanged(GRPCChannelStatus status) { if (CONNECTED.equals(status)) { Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel(); serviceStub = TraceSegmentReportServiceGrpc.newStub(channel); } this.status = status; } } 复制代码
- TraceSegmentServiceClient实现了BootService、IConsumer、TracingContextListener、GRPCChannelListener接口;其prepare方法往GRPCChannelManager注册自身的channelListener;其boot方法设置lastLogTime,实例化DataCarrier,并设置其consumer为自身;其onComplete方法执行TracingContext.ListenerManager.add(this);其shutdown方法执行TracingContext.ListenerManager.remove(this)以及carrier.shutdownConsumers();其consume方法在status为CONNECTED的时候执行upstreamSegmentStreamObserver.onNext(upstreamSegment)、upstreamSegmentStreamObserver.onCompleted()以及status.wait4Finish();其afterFinished方法执行carrier.produce(traceSegment);其statusChanged设置serviceStub及status
ConsumerThread
skywalking-6.6.0/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerThread.java
public class ConsumerThread<T> extends Thread { private volatile boolean running; private IConsumer<T> consumer; private List<DataSource> dataSources; private long consumeCycle; ConsumerThread(String threadName, IConsumer<T> consumer, long consumeCycle) { super(threadName); this.consumer = consumer; running = false; dataSources = new ArrayList<DataSource>(1); this.consumeCycle = consumeCycle; } /** * add whole buffer to consume * * @param sourceBuffer */ void addDataSource(QueueBuffer<T> sourceBuffer) { this.dataSources.add(new DataSource(sourceBuffer)); } @Override public void run() { running = true; final List<T> consumeList = new ArrayList<T>(1500); while (running) { if (!consume(consumeList)) { try { Thread.sleep(consumeCycle); } catch (InterruptedException e) { } } } // consumer thread is going to stop // consume the last time consume(consumeList); consumer.onExit(); } private boolean consume(List<T> consumeList) { for (DataSource dataSource : dataSources) { dataSource.obtain(consumeList); } if (!consumeList.isEmpty()) { try { consumer.consume(consumeList); } catch (Throwable t) { consumer.onError(consumeList, t); } finally { consumeList.clear(); } return true; } return false; } void shutdown() { running = false; } /** * DataSource is a refer to {@link Buffer}. */ class DataSource { private QueueBuffer<T> sourceBuffer; DataSource(QueueBuffer<T> sourceBuffer) { this.sourceBuffer = sourceBuffer; } void obtain(List<T> consumeList) { sourceBuffer.obtain(consumeList); } } } 复制代码
- ConsumerThread继承了Thread,其run方法会循环执行consume(consumeList),跳出循环时会再次执行consume(consumeList),最后执行consumer.onExit();consume方法会遍历dataSources,执行其dataSource.obtain(consumeList),然后在consumeList不为空的时候执行consumer.consume(consumeList)方法
ConsumeDriver
skywalking-6.6.0/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumeDriver.java
public class ConsumeDriver<T> implements IDriver { private boolean running; private ConsumerThread[] consumerThreads; private Channels<T> channels; private ReentrantLock lock; public ConsumeDriver(String name, Channels<T> channels, Class<? extends IConsumer<T>> consumerClass, int num, long consumeCycle) { this(channels, num); for (int i = 0; i < num; i++) { consumerThreads[i] = new ConsumerThread("DataCarrier." + name + ".Consumser." + i + ".Thread", getNewConsumerInstance(consumerClass), consumeCycle); consumerThreads[i].setDaemon(true); } } public ConsumeDriver(String name, Channels<T> channels, IConsumer<T> prototype, int num, long consumeCycle) { this(channels, num); prototype.init(); for (int i = 0; i < num; i++) { consumerThreads[i] = new ConsumerThread("DataCarrier." + name + ".Consumser." + i + ".Thread", prototype, consumeCycle); consumerThreads[i].setDaemon(true); } } private ConsumeDriver(Channels<T> channels, int num) { running = false; this.channels = channels; consumerThreads = new ConsumerThread[num]; lock = new ReentrantLock(); } private IConsumer<T> getNewConsumerInstance(Class<? extends IConsumer<T>> consumerClass) { try { IConsumer<T> inst = consumerClass.newInstance(); inst.init(); return inst; } catch (InstantiationException e) { throw new ConsumerCannotBeCreatedException(e); } catch (IllegalAccessException e) { throw new ConsumerCannotBeCreatedException(e); } } @Override public void begin(Channels channels) { if (running) { return; } try { lock.lock(); this.allocateBuffer2Thread(); for (ConsumerThread consumerThread : consumerThreads) { consumerThread.start(); } running = true; } finally { lock.unlock(); } } @Override public boolean isRunning(Channels channels) { return running; } private void allocateBuffer2Thread() { int channelSize = this.channels.getChannelSize(); /** * if consumerThreads.length < channelSize * each consumer will process several channels. * * if consumerThreads.length == channelSize * each consumer will process one channel. * * if consumerThreads.length > channelSize * there will be some threads do nothing. */ for (int channelIndex = 0; channelIndex < channelSize; channelIndex++) { int consumerIndex = channelIndex % consumerThreads.length; consumerThreads[consumerIndex].addDataSource(channels.getBuffer(channelIndex)); } } @Override public void close(Channels channels) { try { lock.lock(); this.running = false; for (ConsumerThread consumerThread : consumerThreads) { consumerThread.shutdown(); } } finally { lock.unlock(); } } } 复制代码
- ConsumeDriver实现了IDriver接口,其ConsumeDriver会创建num个ConsumerThread;其begin方法会执行allocateBuffer2Thread,给每个consumerThread添加dataSource,然后执行consumerThread.start();其close方法会执行consumerThread.shutdown()
小结
TraceSegmentServiceClient实现了BootService、IConsumer、TracingContextListener、GRPCChannelListener接口;其prepare方法往GRPCChannelManager注册自身的channelListener;其boot方法设置lastLogTime,实例化DataCarrier,并设置其consumer为自身;其onComplete方法执行TracingContext.ListenerManager.add(this);其shutdown方法执行TracingContext.ListenerManager.remove(this)以及carrier.shutdownConsumers();其consume方法在status为CONNECTED的时候执行upstreamSegmentStreamObserver.onNext(upstreamSegment)、upstreamSegmentStreamObserver.onCompleted()以及status.wait4Finish();其afterFinished方法执行carrier.produce(traceSegment);其statusChanged设置serviceStub及status
doc
这篇关于聊聊skywalking的TraceSegmentServiceClient的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-10-04el-table 开启定时器下,表格的选中状态会消失是什么原因-icode9专业技术文章分享
- 2024-10-03如何安装和初始化飞牛私有云 fnOS?-icode9专业技术文章分享
- 2024-10-03如何安装 App 并连接到飞牛 NAS?-icode9专业技术文章分享
- 2024-10-03如何安装飞牛 TV 并连接到影视服务器?-icode9专业技术文章分享
- 2024-10-03如何在PVE和ESXI上安装飞牛私有云 fnOS?-icode9专业技术文章分享
- 2024-10-03fnOS国产最强NAS安装系统异常情况处理-icode9专业技术文章分享
- 2024-10-03飞牛NAS如何创建存储空间?-icode9专业技术文章分享
- 2024-10-03fnOS国产最强NAS硬盘会自动休眠吗?-icode9专业技术文章分享
- 2024-10-03fnOS国产最强NAS如何安装飞牛影视和创建媒体库?-icode9专业技术文章分享
- 2024-10-03fnOS国产最强NAS如何为家人朋友开通影视账号?-icode9专业技术文章分享