package com.twosigma.waiter.courier;

import com.twosigma.waiter.courier.CourierGrpc;
import com.twosigma.waiter.courier.StateReply;
import io.grpc.Context;
import io.grpc.Contexts;
import io.grpc.ForwardingServerCall;
import io.grpc.Metadata;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.logging.Logger;

/* loaded from: input_file:com/twosigma/waiter/courier/GrpcServer.class */
public class GrpcServer {
    private static final Logger LOGGER = Logger.getLogger(GrpcServer.class.getName());
    private static final Context.Key<CidTimestamp> CID_TIMESTAMP = Context.key("CID.TIMESTAMP");
    private static final Map<String, Map<Long, List<String>>> requestCidToStateList = new HashMap();
    private Server server;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/twosigma/waiter/courier/GrpcServer$CidTimestamp.class */
    public static class CidTimestamp {
        private final String correlationId;
        private final long timestamp;

        private CidTimestamp(String str, long j) {
            this.correlationId = str;
            this.timestamp = j;
        }

        public String toString() {
            return "CidTimestamp{correlationId='" + this.correlationId + "', timestamp=" + this.timestamp + "}";
        }
    }

    /* loaded from: input_file:com/twosigma/waiter/courier/GrpcServer$CorrelationIdInterceptor.class */
    public class CorrelationIdInterceptor implements ServerInterceptor {
        public CorrelationIdInterceptor() {
        }

        public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall, Metadata metadata, ServerCallHandler<ReqT, RespT> serverCallHandler) {
            String str = (String) metadata.get(Metadata.Key.of("x-cid", Metadata.ASCII_STRING_MARSHALLER));
            if (str == null) {
                str = "courier-" + System.nanoTime();
            }
            return Contexts.interceptCall(Context.current().withValue(GrpcServer.CID_TIMESTAMP, new CidTimestamp(str, System.nanoTime())), serverCall, metadata, serverCallHandler);
        }
    }

    /* loaded from: input_file:com/twosigma/waiter/courier/GrpcServer$CourierImpl.class */
    private static class CourierImpl extends CourierGrpc.CourierImplBase {
        private CourierImpl() {
        }

        @Override // com.twosigma.waiter.courier.CourierGrpc.CourierImplBase
        public void retrieveState(StateRequest stateRequest, StreamObserver<StateReply> streamObserver) {
            String cid = stateRequest.getCid();
            GrpcServer.LOGGER.info("received StateRequest{cid=" + cid + "}");
            GrpcServer.registerOnCancelHandler(streamObserver);
            StateReply.Builder cid2 = StateReply.newBuilder().setCid(cid);
            List<String> trackState = GrpcServer.trackState(cid);
            GrpcServer.LOGGER.info("cid " + cid + " has states: " + trackState);
            if (trackState != null) {
                Iterator<String> it = trackState.iterator();
                while (it.hasNext()) {
                    cid2.addState(it.next());
                }
            }
            StateReply m194build = cid2.m194build();
            GrpcServer.LOGGER.info("Sending StateReply for cid=" + m194build.getCid());
            streamObserver.onNext(m194build);
            streamObserver.onCompleted();
        }

        @Override // com.twosigma.waiter.courier.CourierGrpc.CourierImplBase
        public void sendPackage(CourierRequest courierRequest, StreamObserver<CourierReply> streamObserver) {
            GrpcServer.LOGGER.info("received CourierRequest{id=" + courierRequest.getId() + ", from=" + courierRequest.getFrom() + ", message.length=" + courierRequest.getMessage().length() + "}");
            GrpcServer.registerOnCancelHandler(streamObserver);
            GrpcServer.sleep(courierRequest.getSleepDurationMillis());
            if (Variant.SEND_ERROR.equals(courierRequest.getVariant())) {
                StatusRuntimeException asRuntimeException = Status.CANCELLED.withCause(new RuntimeException(courierRequest.getId())).withDescription("Cancelled by server").asRuntimeException();
                GrpcServer.LOGGER.info("Sending cancelled by server error");
                streamObserver.onError(asRuntimeException);
            } else if (Variant.EXIT_PRE_RESPONSE.equals(courierRequest.getVariant())) {
                GrpcServer.sleep(1000L);
                GrpcServer.LOGGER.info("Exiting server abruptly");
                System.exit(1);
            } else {
                CourierReply m47build = CourierReply.newBuilder().setId(courierRequest.getId()).setMessage(courierRequest.getMessage()).setResponse("received").m47build();
                GrpcServer.LOGGER.info("Sending CourierReply for id=" + m47build.getId());
                streamObserver.onNext(m47build);
                streamObserver.onCompleted();
            }
        }

        @Override // com.twosigma.waiter.courier.CourierGrpc.CourierImplBase
        public StreamObserver<CourierRequest> collectPackages(final StreamObserver<CourierSummary> streamObserver) {
            GrpcServer.registerOnCancelHandler(streamObserver);
            return new StreamObserver<CourierRequest>() { // from class: com.twosigma.waiter.courier.GrpcServer.CourierImpl.1
                private long numMessages = 0;
                private long totalLength = 0;

                public void onNext(CourierRequest courierRequest) {
                    GrpcServer.LOGGER.info("Received CourierRequest id=" + courierRequest.getId());
                    this.numMessages++;
                    this.totalLength += courierRequest.getMessage().length();
                    Logger logger = GrpcServer.LOGGER;
                    long j = this.numMessages;
                    long j2 = this.totalLength;
                    logger.info("Summary of collected packages: numMessages=" + j + " with totalLength=" + logger);
                    GrpcServer.sleep(courierRequest.getSleepDurationMillis());
                    if (Variant.EXIT_PRE_RESPONSE.equals(courierRequest.getVariant())) {
                        GrpcServer.sleep(1000L);
                        GrpcServer.LOGGER.info("Exiting server abruptly");
                        System.exit(1);
                    } else if (Variant.SEND_ERROR.equals(courierRequest.getVariant())) {
                        StatusRuntimeException asRuntimeException = Status.CANCELLED.withCause(new RuntimeException(courierRequest.getId())).withDescription("Cancelled by server").asRuntimeException();
                        GrpcServer.LOGGER.info("Sending cancelled by server error");
                        streamObserver.onError(asRuntimeException);
                    } else {
                        CourierSummary m141build = CourierSummary.newBuilder().setNumMessages(this.numMessages).setTotalLength(this.totalLength).m141build();
                        GrpcServer.LOGGER.info("Sending CourierSummary for id=" + courierRequest.getId());
                        streamObserver.onNext(m141build);
                    }
                    if (Variant.EXIT_POST_RESPONSE.equals(courierRequest.getVariant())) {
                        GrpcServer.sleep(1000L);
                        GrpcServer.LOGGER.info("Exiting server abruptly");
                        System.exit(1);
                    }
                }

                public void onError(Throwable th) {
                    GrpcServer.LOGGER.severe("Error in collecting packages: " + th.getMessage());
                    streamObserver.onError(th);
                }

                public void onCompleted() {
                    GrpcServer.LOGGER.severe("Completed collecting packages");
                    streamObserver.onCompleted();
                }
            };
        }

        @Override // com.twosigma.waiter.courier.CourierGrpc.CourierImplBase
        public StreamObserver<CourierRequest> aggregatePackages(final StreamObserver<CourierSummary> streamObserver) {
            GrpcServer.registerOnCancelHandler(streamObserver);
            return new StreamObserver<CourierRequest>() { // from class: com.twosigma.waiter.courier.GrpcServer.CourierImpl.2
                private long numMessages = 0;
                private long totalLength = 0;

                public void onNext(CourierRequest courierRequest) {
                    GrpcServer.LOGGER.info("Received CourierRequest id=" + courierRequest.getId());
                    this.numMessages++;
                    this.totalLength += courierRequest.getMessage().length();
                    Logger logger = GrpcServer.LOGGER;
                    long j = this.numMessages;
                    long j2 = this.totalLength;
                    logger.info("Summary of collected packages: numMessages=" + j + " with totalLength=" + logger);
                    GrpcServer.sleep(courierRequest.getSleepDurationMillis());
                    if (Variant.EXIT_PRE_RESPONSE.equals(courierRequest.getVariant()) || Variant.EXIT_POST_RESPONSE.equals(courierRequest.getVariant())) {
                        GrpcServer.sleep(1000L);
                        GrpcServer.LOGGER.info("Exiting server abruptly");
                        System.exit(1);
                    } else if (Variant.SEND_ERROR.equals(courierRequest.getVariant())) {
                        StatusRuntimeException asRuntimeException = Status.CANCELLED.withCause(new RuntimeException(courierRequest.getId())).withDescription("Cancelled by server").asRuntimeException();
                        GrpcServer.LOGGER.info("Sending cancelled by server error");
                        streamObserver.onError(asRuntimeException);
                    }
                }

                public void onError(Throwable th) {
                    GrpcServer.LOGGER.severe("Error in aggregating packages: " + th.getMessage());
                    streamObserver.onError(th);
                }

                public void onCompleted() {
                    GrpcServer.LOGGER.severe("Completed aggregating packages");
                    CourierSummary m141build = CourierSummary.newBuilder().setNumMessages(this.numMessages).setTotalLength(this.totalLength).m141build();
                    GrpcServer.LOGGER.info("Sending aggregated CourierSummary");
                    streamObserver.onNext(m141build);
                    streamObserver.onCompleted();
                }
            };
        }
    }

    /* loaded from: input_file:com/twosigma/waiter/courier/GrpcServer$GrpcServerInterceptor.class */
    private static class GrpcServerInterceptor implements ServerInterceptor {
        private GrpcServerInterceptor() {
        }

        public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall, final Metadata metadata, ServerCallHandler<ReqT, RespT> serverCallHandler) {
            logMetadata(metadata, "request");
            CidTimestamp cidTimestamp = (CidTimestamp) GrpcServer.CID_TIMESTAMP.get();
            final String str = cidTimestamp.correlationId;
            final long j = cidTimestamp.timestamp;
            GrpcServer.trackState(str, j, "INIT");
            final ServerCall.Listener startCall = serverCallHandler.startCall(new ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(serverCall) { // from class: com.twosigma.waiter.courier.GrpcServer.GrpcServerInterceptor.1
                public void sendHeaders(Metadata metadata2) {
                    GrpcServer.LOGGER.info("GrpcServerInterceptor.sendHeaders[cid=" + str + "]");
                    GrpcServerInterceptor.this.logMetadata(metadata, "response");
                    if (str != null) {
                        GrpcServer.LOGGER.info("response linked to cid: " + str);
                        metadata2.put(Metadata.Key.of("x-cid", Metadata.ASCII_STRING_MARSHALLER), str);
                        GrpcServer.trackState(str, j, "SEND_HEADERS");
                    }
                    super.sendHeaders(metadata2);
                }

                public void sendMessage(RespT respt) {
                    GrpcServer.LOGGER.info("GrpcServerInterceptor.sendMessage[cid=" + str + "]");
                    GrpcServer.trackState(str, j, "SEND_MESSAGE");
                    super.sendMessage(respt);
                }

                public void close(Status status, Metadata metadata2) {
                    GrpcServer.LOGGER.info("GrpcServerInterceptor.close[cid=" + str + "] " + status + ", " + metadata2);
                    GrpcServer.trackState(str, j, "CLOSE");
                    super.close(status, metadata2);
                }
            }, metadata);
            return new ServerCall.Listener<ReqT>() { // from class: com.twosigma.waiter.courier.GrpcServer.GrpcServerInterceptor.2
                public void onMessage(ReqT reqt) {
                    GrpcServer.LOGGER.info("GrpcServerInterceptor.onMessage[cid=" + str + "]");
                    GrpcServer.trackState(str, j, "RECEIVE_MESSAGE");
                    startCall.onMessage(reqt);
                }

                public void onHalfClose() {
                    GrpcServer.LOGGER.info("GrpcServerInterceptor.onHalfClose[cid=" + str + "]");
                    GrpcServer.trackState(str, j, "HALF_CLOSE");
                    startCall.onHalfClose();
                }

                public void onCancel() {
                    GrpcServer.LOGGER.info("GrpcServerInterceptor.onCancel[cid=" + str + "]");
                    GrpcServer.LOGGER.info(str + " states: " + GrpcServer.trackState(str, j, "CANCEL"));
                    startCall.onCancel();
                }

                public void onComplete() {
                    GrpcServer.LOGGER.info("GrpcServerInterceptor.onComplete[cid=" + str + "]");
                    GrpcServer.LOGGER.info(str + " states: " + GrpcServer.trackState(str, j, "COMPLETE"));
                    startCall.onComplete();
                }

                public void onReady() {
                    GrpcServer.LOGGER.info("GrpcServerInterceptor.onReady[cid=" + str + "]");
                    GrpcServer.trackState(str, j, "READY");
                    startCall.onReady();
                }
            };
        }

        private void logMetadata(Metadata metadata, String str) {
            Set<String> keys = metadata.keys();
            GrpcServer.LOGGER.info(str + "@" + metadata.hashCode() + " metadata keys = " + keys);
            for (String str2 : keys) {
                GrpcServer.LOGGER.info(str + " metadata " + str2 + " = " + ((String) metadata.get(Metadata.Key.of(str2, Metadata.ASCII_STRING_MARSHALLER))));
            }
        }
    }

    private static List<String> trackState(String str, long j, String str2) {
        List<String> arrayList;
        List<String> list;
        if (str == null) {
            return new ArrayList();
        }
        synchronized (requestCidToStateList) {
            Map<Long, List<String>> map = requestCidToStateList.get(str);
            if (map != null) {
                List<String> list2 = map.get(Long.valueOf(j));
                if (list2 == null) {
                    arrayList = new ArrayList();
                    map.put(Long.valueOf(j), arrayList);
                } else {
                    arrayList = list2;
                }
            } else {
                arrayList = new ArrayList();
                HashMap hashMap = new HashMap();
                hashMap.put(Long.valueOf(j), arrayList);
                requestCidToStateList.put(str, hashMap);
            }
            arrayList.add(str2);
            list = arrayList;
        }
        return list;
    }

    private static List<String> trackState(String str) {
        synchronized (requestCidToStateList) {
            Map<Long, List<String>> map = requestCidToStateList.get(str);
            if (map == null || map.isEmpty()) {
                return new ArrayList();
            }
            ArrayList arrayList = new ArrayList(map.keySet());
            Collections.sort(arrayList);
            List<String> list = map.get((Long) arrayList.get(0));
            if (list == null) {
                return null;
            }
            return new ArrayList(list);
        }
    }

    private static void sleep(long j) {
        if (j > 0) {
            try {
                Thread.sleep(j);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    private static void registerOnCancelHandler(StreamObserver<?> streamObserver) {
        if (streamObserver instanceof ServerCallStreamObserver) {
            CidTimestamp cidTimestamp = (CidTimestamp) CID_TIMESTAMP.get();
            String str = cidTimestamp.correlationId;
            long j = cidTimestamp.timestamp;
            ((ServerCallStreamObserver) streamObserver).setOnCancelHandler(() -> {
                LOGGER.info("CancelHandler for " + cidTimestamp + " was triggered");
                trackState(str, j, "CANCEL_HANDLER");
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start(int i) throws IOException {
        LOGGER.info("starting gRPC server on port " + i);
        this.server = ServerBuilder.forPort(i).addService(new CourierImpl()).intercept(new GrpcServerInterceptor()).intercept(new CorrelationIdInterceptor()).build().start();
        LOGGER.info("gRPC server started, listening on " + i);
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.err.println("*** shutting down gRPC server since JVM is shutting down");
            stop();
            System.err.println("*** server shut down");
        }));
    }

    private void stop() {
        if (this.server != null) {
            this.server.shutdown();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void blockUntilShutdown() throws InterruptedException {
        if (this.server != null) {
            this.server.awaitTermination();
        }
    }
}
