zeebe存储源码解析
2021/12/22 12:49:50
本文主要是介绍zeebe存储源码解析,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
zeebe源码解析
zeebe调用源码解析
ZeebeClient通过Grpc调用Gateway,Gateway将请求通过netty分发到Broker,Broker处理逻辑
客户端使用ZeebeClient和Zeebe交互,其中最常用的是注册Worker来完成job
zeebeClient.newWorker() .jobType(ZeebeConfigProperties.CONFIG_UPDATE_PIPELINE_EXECUATION_ERROR) .handler((jobClient, job) -> configChangeWorkers.errorHandle(jobClient, job)) .fetchVariables(Workers.CONTEXT_ID, Workers.CHANGE_REQUEST, Workers.DEVICE_INFO, Workers.MANAGED_OBEJCT) .open()
Open方法实例化JobWorkerImpl,在JobWorkerImpl的构造方法中建立定时任务tryActivateJobs不断拉取job
public JobWorkerImpl(final int maxJobsActive, final ScheduledExecutorService executor, final Duration pollInterval, final JobRunnableFactory jobRunnableFactory, final JobPoller jobPoller) { this.maxJobsActive = maxJobsActive; this.activationThreshold = Math.round((float)maxJobsActive * 0.3F); this.remainingJobs = new AtomicInteger(0); this.executor = executor; this.jobRunnableFactory = jobRunnableFactory; this.jobPoller = new AtomicReference(jobPoller); executor.scheduleWithFixedDelay(this::tryActivateJobs, 0L, pollInterval.toMillis(), TimeUnit.MILLISECONDS); }
在activateJobs方法中jobPoller.poll进行任务拉取,this::submitJob进行拉取任务后对任务处理
private void activateJobs() { JobPoller jobPoller = (JobPoller)this.jobPoller.getAndSet((Object)null); if (jobPoller != null) { int currentRemainingJobs = this.remainingJobs.get(); if (this.shouldActivateJobs(currentRemainingJobs)) { int maxActivatedJobs = this.maxJobsActive - currentRemainingJobs; try { jobPoller.poll(maxActivatedJobs, this::submitJob, (activatedJobs) -> { this.remainingJobs.addAndGet(activatedJobs); this.jobPoller.set(jobPoller); }, this::isOpen); } catch (Exception var5) { LOG.warn("Failed to activate jobs", var5); this.jobPoller.set(jobPoller); } } else { this.jobPoller.set(jobPoller); } } }
poll方法通过Grpc调用Zeebe
private void poll() { LOG.trace("Polling at max {} jobs for worker {} and job type {}", new Object[]{this.requestBuilder.getMaxJobsToActivate(), this.requestBuilder.getWorker(), this.requestBuilder.getType()}); ((GatewayStub)this.gatewayStub.withDeadlineAfter(this.requestTimeout, TimeUnit.MILLISECONDS)).activateJobs(this.requestBuilder.build(), this); }
onNext方法中就是对返回的结果进行比对,如果有对应上的worker则进行处理
public void onNext(final ActivateJobsResponse activateJobsResponse) { this.activatedJobs += activateJobsResponse.getJobsCount(); activateJobsResponse.getJobsList().stream().map((job) -> { return new ActivatedJobImpl(this.objectMapper, job); }).forEach(this.jobConsumer); }
Zeebe的proto文件定义在gateway-protocol项目中
GatewayGrpcService是Zeebe接口的入口处
@Override public void activateJobs( final ActivateJobsRequest request, final StreamObserver<ActivateJobsResponse> responseObserver) { endpointManager.activateJobs( request, ErrorMappingStreamObserver.ofStreamObserver(responseObserver)); } @Override public void cancelProcessInstance( final CancelProcessInstanceRequest request, final StreamObserver<CancelProcessInstanceResponse> responseObserver) { endpointManager.cancelProcessInstance( request, ErrorMappingStreamObserver.ofStreamObserver(responseObserver)); } @Override public void completeJob( final CompleteJobRequest request, final StreamObserver<CompleteJobResponse> responseObserver) { endpointManager.completeJob( request, ErrorMappingStreamObserver.ofStreamObserver(responseObserver)); }
其中completeJob方法中调用sendRequest方法,RequestMapper::toCompleteJobRequest是对请求参数处理的方法引用,ResponseMapper::toCompleteJobResponse是返回结果处理的方法引用
public void completeJob( final CompleteJobRequest request, final ServerStreamObserver<CompleteJobResponse> responseObserver) { sendRequest( request, RequestMapper::toCompleteJobRequest, ResponseMapper::toCompleteJobResponse, responseObserver); }
该方法中先将grpc的request转化为brokerRequest,再通过brokerClient将请求转发给broker
private <GrpcRequestT, BrokerResponseT, GrpcResponseT> void sendRequest( final GrpcRequestT grpcRequest, final Function<GrpcRequestT, BrokerRequest<BrokerResponseT>> requestMapper, final BrokerResponseMapper<BrokerResponseT, GrpcResponseT> responseMapper, final ServerStreamObserver<GrpcResponseT> streamObserver) { final BrokerRequest<BrokerResponseT> brokerRequest; try { brokerRequest = requestMapper.apply(grpcRequest); } catch (final Exception e) { streamObserver.onError(e); return; } brokerClient.sendRequestWithRetry( brokerRequest, (key, response) -> consumeResponse(responseMapper, streamObserver, key, response), streamObserver::onError); }
sendRequestInternal先决定处理的borker,sender.send请求调用,actor.runOnCompletion对返回结果进行处理
private <T> void sendRequestInternal( final BrokerRequest<T> request, final CompletableFuture<BrokerResponse<T>> returnFuture, final TransportRequestSender sender, final Duration requestTimeout) { final BrokerAddressProvider nodeIdProvider; try { nodeIdProvider = determineBrokerNodeIdProvider(request); } catch (final PartitionNotFoundException e) { returnFuture.completeExceptionally(e); GatewayMetrics.registerFailedRequest( request.getPartitionId(), request.getType(), "PARTITION_NOT_FOUND"); return; } catch (final NoTopologyAvailableException e) { returnFuture.completeExceptionally(e); GatewayMetrics.registerFailedRequest( request.getPartitionId(), request.getType(), "NO_TOPOLOGY"); return; } final ActorFuture<DirectBuffer> responseFuture = sender.send(clientTransport, nodeIdProvider, request, requestTimeout); final long startTime = System.currentTimeMillis(); actor.runOnCompletion( responseFuture, (clientResponse, error) -> { RequestResult result = null; try { if (error == null) { final BrokerResponse<T> response = request.getResponse(clientResponse); result = handleResponse(response, returnFuture); if (result.wasProcessed()) { final long elapsedTime = System.currentTimeMillis() - startTime; GatewayMetrics.registerSuccessfulRequest( request.getPartitionId(), request.getType(), elapsedTime); return; } } else { returnFuture.completeExceptionally(error); } } catch (final RuntimeException e) { returnFuture.completeExceptionally(new ClientResponseException(e)); } registerFailure(request, result, error); }); }
sender.send的实际调用方式
private static final TransportRequestSender SENDER_WITH_RETRY = (c, s, r, t) -> c.sendRequestWithRetry(s, BrokerRequestManager::responseValidation, r, t);
**最终调用为 messagingService
.sendAndReceive(nodeAddress, requestContext.getTopicName(), requestBytes, calculateTimeout)通过netty调用broker
**
handler.accept(message, this)方法处理请求
@Override public void dispatch(final ProtocolRequest message) { final String subject = message.subject(); final BiConsumer<ProtocolRequest, ServerConnection> handler = handlers.get(subject); if (handler != null) { log.trace("Received message type {} from {}", subject, message.sender()); handler.accept(message, this); } else { log.debug("No handler for message type {} from {}", subject, message.sender()); byte[] subjectBytes = null; if (subject != null) { subjectBytes = StringUtil.getBytes(subject); } reply(message, ProtocolReply.Status.ERROR_NO_HANDLER, Optional.ofNullable(subjectBytes)); } }
这篇关于zeebe存储源码解析的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-10-05小米13T Pro系统合集:性能与摄影的极致融合,值得你升级的系统ROM
- 2024-10-01基于Python+Vue开发的医院门诊预约挂号系统
- 2024-10-01基于Python+Vue开发的旅游景区管理系统
- 2024-10-01RestfulAPI入门指南:打造简单易懂的API接口
- 2024-10-01初学者指南:了解和使用Server Action
- 2024-10-01Server Component入门指南:搭建与配置详解
- 2024-10-01React 中使用 useRequest 实现数据请求
- 2024-10-01使用 golang 将ETH账户的资产平均分散到其他账户
- 2024-10-01JWT用户校验课程:从入门到实践
- 2024-10-01Server Component课程入门指南