/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.client.amqp;

import io.opentracing.Span;
import io.opentracing.tag.Tags;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.proton.ProtonDelivery;
import io.vertx.proton.ProtonHelper;
import io.vertx.proton.ProtonQoS;
import io.vertx.proton.ProtonReceiver;
import io.vertx.proton.ProtonSender;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.amqp.AbstractHonoClient;
import org.eclipse.hono.client.amqp.config.RequestResponseClientConfigProperties;
import org.eclipse.hono.client.amqp.connection.AmqpUtils;
import org.eclipse.hono.client.amqp.connection.ErrorConverter;
import org.eclipse.hono.client.amqp.connection.HonoConnection;
import org.eclipse.hono.client.amqp.connection.HonoProtonHelper;
import org.eclipse.hono.client.amqp.connection.SendMessageSampler;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.RequestResponseResult;
import org.eclipse.hono.util.TriTuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RequestResponseClient<R extends RequestResponseResult<?>>
extends AbstractHonoClient {
    private static final Logger LOG = LoggerFactory.getLogger(RequestResponseClient.class);
    private final String linkTargetAddress;
    private final String replyToAddress;
    private final SendMessageSampler sampler;
    private final Map<Object, TriTuple<Promise<R>, BiFunction<Message, ProtonDelivery, R>, Span>> replyMap = new HashMap<Object, TriTuple<Promise<R>, BiFunction<Message, ProtonDelivery, R>, Span>>();
    private final String requestEndPointName;
    private final String responseEndPointName;
    private final String tenantId;
    private final Supplier<String> messageIdSupplier;
    private long requestTimeoutMillis;

    private RequestResponseClient(HonoConnection connection, String requestEndPointName, String responseEndPointName, String tenantId, String replyId, Supplier<String> messageIdSupplier, SendMessageSampler sampler) {
        super(connection);
        this.requestEndPointName = Objects.requireNonNull(requestEndPointName);
        this.responseEndPointName = Objects.requireNonNull(responseEndPointName);
        Objects.requireNonNull(replyId);
        this.sampler = Objects.requireNonNull(sampler);
        this.requestTimeoutMillis = connection.getConfig().getRequestTimeout();
        this.messageIdSupplier = Optional.ofNullable(messageIdSupplier).orElse(this::createMessageId);
        if (tenantId == null) {
            this.linkTargetAddress = requestEndPointName;
            this.replyToAddress = String.format("%s/%s", responseEndPointName, replyId);
        } else {
            this.linkTargetAddress = String.format("%s/%s", requestEndPointName, tenantId);
            this.replyToAddress = String.format("%s/%s/%s", responseEndPointName, tenantId, replyId);
        }
        this.tenantId = tenantId;
    }

    public static <T extends RequestResponseResult<?>> Future<RequestResponseClient<T>> forEndpoint(HonoConnection connection, String endpointName, String tenantId, SendMessageSampler sampler, Handler<String> senderCloseHook, Handler<String> receiverCloseHook) {
        return RequestResponseClient.forEndpoint(connection, endpointName, endpointName, tenantId, UUID.randomUUID().toString(), null, sampler, senderCloseHook, receiverCloseHook);
    }

    public static <T extends RequestResponseResult<?>> Future<RequestResponseClient<T>> forEndpoint(HonoConnection connection, String requestEndPointName, String responseEndPointName, String tenantId, String replyId, Supplier<String> messageIdSupplier, SendMessageSampler sampler, Handler<String> senderCloseHook, Handler<String> receiverCloseHook) {
        RequestResponseClient result = new RequestResponseClient(connection, requestEndPointName, responseEndPointName, tenantId, replyId, messageIdSupplier, sampler);
        return result.createLinks(senderCloseHook, receiverCloseHook).map(result);
    }

    protected final long getResponseCacheDefaultTimeout() {
        if (this.connection.getConfig() instanceof RequestResponseClientConfigProperties) {
            return ((RequestResponseClientConfigProperties)this.connection.getConfig()).getResponseCacheDefaultTimeout();
        }
        return 600L;
    }

    public final void setRequestTimeout(long timoutMillis) {
        if (timoutMillis < 0L) {
            throw new IllegalArgumentException("request timeout must be >= 0");
        }
        this.requestTimeoutMillis = timoutMillis;
    }

    private String createMessageId() {
        return String.format("%s-client-%s", this.requestEndPointName, UUID.randomUUID());
    }

    public final Future<Void> createLinks() {
        return this.createLinks(null, null);
    }

    public final Future<Void> createLinks(Handler<String> senderCloseHook, Handler<String> receiverCloseHook) {
        return this.createReceiver(this.replyToAddress, receiverCloseHook).compose(recv -> {
            this.receiver = recv;
            return this.createSender(this.linkTargetAddress, senderCloseHook);
        }).compose(sender -> {
            LOG.debug("request-response client for peer [{}] created", (Object)this.connection.getConfig().getHost());
            this.offeredCapabilities = Optional.ofNullable(sender.getRemoteOfferedCapabilities()).map(caps -> Collections.unmodifiableList(Arrays.asList(caps))).orElse(Collections.emptyList());
            this.sender = sender;
            return Future.succeededFuture();
        });
    }

    private Future<ProtonSender> createSender(String targetAddress, Handler<String> closeHook) {
        return this.connection.createSender(targetAddress, ProtonQoS.AT_LEAST_ONCE, closeHook);
    }

    private Future<ProtonReceiver> createReceiver(String sourceAddress, Handler<String> closeHook) {
        return this.connection.createReceiver(sourceAddress, ProtonQoS.AT_LEAST_ONCE, this::handleResponse, closeHook);
    }

    private void handleResponse(ProtonDelivery delivery, Message message) {
        TriTuple<Promise<R>, BiFunction<Message, ProtonDelivery, R>, Span> handler = this.replyMap.remove(message.getCorrelationId());
        if (handler == null) {
            LOG.debug("discarding unexpected response [reply-to: {}, correlation ID: {}]", (Object)this.replyToAddress, message.getCorrelationId());
            ProtonHelper.rejected(delivery, true);
        } else {
            RequestResponseResult response = (RequestResponseResult)handler.two().apply(message, delivery);
            Span span = handler.three();
            if (response == null) {
                LOG.debug("discarding malformed response [reply-to: {}, correlation ID: {}]", (Object)this.replyToAddress, message.getCorrelationId());
                handler.one().handle(Future.failedFuture(new ServerErrorException(500, "cannot process response from service [" + this.responseEndPointName + "]")));
                ProtonHelper.released(delivery, true);
            } else {
                LOG.debug("received response [reply-to: {}, subject: {}, correlation ID: {}, status: {}, cache-directive: {}]", this.replyToAddress, message.getSubject(), message.getCorrelationId(), response.getStatus(), response.getCacheDirective());
                if (span != null) {
                    span.log("response from peer accepted");
                }
                handler.one().handle((AsyncResult<R>)Future.succeededFuture(response));
                if (!delivery.isSettled()) {
                    LOG.debug("client provided response handler did not settle message, auto-accepting ...");
                    ProtonHelper.accepted(delivery, true);
                }
            }
        }
    }

    private boolean cancelRequest(Object correlationId, AsyncResult<R> result) {
        Objects.requireNonNull(correlationId);
        Objects.requireNonNull(result);
        if (result.succeeded()) {
            throw new IllegalArgumentException("result must be failed");
        }
        return this.cancelRequest(correlationId, result::cause);
    }

    private boolean cancelRequest(Object correlationId, Supplier<Throwable> exceptionSupplier) {
        Objects.requireNonNull(correlationId);
        Objects.requireNonNull(exceptionSupplier);
        return Optional.ofNullable(this.replyMap.remove(correlationId)).map(handler -> {
            Throwable throwable = (Throwable)exceptionSupplier.get();
            LOG.debug("canceling request [target: {}, correlation ID: {}]: {}", this.linkTargetAddress, correlationId, throwable.getMessage());
            ((Promise)handler.one()).fail(throwable);
            return true;
        }).orElse(false);
    }

    private Message createMessage(String subject, String address, Map<String, Object> appProperties) {
        Objects.requireNonNull(subject);
        Message msg = ProtonHelper.message();
        AbstractHonoClient.setApplicationProperties(msg, appProperties);
        msg.setAddress(address);
        msg.setReplyTo(this.replyToAddress);
        msg.setMessageId(this.messageIdSupplier.get());
        msg.setSubject(subject);
        return msg;
    }

    public final Future<R> createAndSendRequest(String action, Map<String, Object> properties, Buffer payload, String contentType, Function<Message, R> responseMapper, Span currentSpan) {
        return this.createAndSendRequest(action, this.linkTargetAddress, properties, payload, contentType, responseMapper, currentSpan);
    }

    public final Future<R> createAndSendRequest(String action, String address, Map<String, Object> properties, Buffer payload, String contentType, Function<Message, R> responseMapper, Span currentSpan) {
        Objects.requireNonNull(responseMapper);
        return this.createAndSendRequest(action, address, properties, payload, contentType, (Message message, ProtonDelivery delivery) -> (RequestResponseResult)responseMapper.apply((Message)message), currentSpan);
    }

    public final Future<R> createAndSendRequest(String action, String address, Map<String, Object> properties, Buffer payload, String contentType, BiFunction<Message, ProtonDelivery, R> responseMapper, Span currentSpan) {
        Objects.requireNonNull(action);
        Objects.requireNonNull(currentSpan);
        Objects.requireNonNull(responseMapper);
        if (this.isOpen()) {
            Message request = this.createMessage(action, address, properties);
            AmqpUtils.setPayload(request, contentType, payload);
            return this.sendRequest(request, responseMapper, currentSpan);
        }
        return Future.failedFuture(new ServerErrorException(503, "sender and/or receiver link is not open"));
    }

    private Future<R> sendRequest(Message request, BiFunction<Message, ProtonDelivery, R> responseMapper, Span currentSpan) {
        String requestTargetAddress = Optional.ofNullable(request.getAddress()).orElse(this.linkTargetAddress);
        Tags.MESSAGE_BUS_DESTINATION.set(currentSpan, requestTargetAddress);
        Tags.SPAN_KIND.set(currentSpan, "client");
        Tags.HTTP_METHOD.set(currentSpan, request.getSubject());
        if (this.tenantId != null) {
            currentSpan.setTag("tenant_id", this.tenantId);
        }
        return this.connection.executeOnContext(res -> {
            if (this.sender.sendQueueFull()) {
                LOG.debug("cannot send request to peer, no credit left for link [link target: {}]", (Object)this.linkTargetAddress);
                res.fail(new ServerErrorException(503, "no credit available for sending request"));
                this.sampler.noCredit(this.tenantId);
            } else {
                HashMap<String, Object> details = new HashMap<String, Object>(3);
                Object correlationId = Optional.ofNullable(request.getCorrelationId()).orElse(request.getMessageId());
                if (correlationId instanceof String) {
                    details.put(TracingHelper.TAG_CORRELATION_ID.getKey(), correlationId);
                }
                details.put(TracingHelper.TAG_CREDIT.getKey(), this.sender.getCredit());
                details.put(TracingHelper.TAG_QOS.getKey(), this.sender.getQoS().toString());
                currentSpan.log(details);
                TriTuple<Promise, BiFunction, Span> handler = TriTuple.of(res, responseMapper, currentSpan);
                AmqpUtils.injectSpanContext(this.connection.getTracer(), currentSpan.context(), request);
                this.replyMap.put(correlationId, handler);
                SendMessageSampler.Sample sample = this.sampler.start(this.tenantId);
                this.sender.send(request, deliveryUpdated -> {
                    Promise failedResult = Promise.promise();
                    DeliveryState remoteState = deliveryUpdated.getRemoteState();
                    sample.completed(remoteState);
                    if (remoteState instanceof Rejected) {
                        Rejected rejected = (Rejected)remoteState;
                        if (rejected.getError() != null) {
                            LOG.debug("service did not accept request [target address: {}, subject: {}, correlation ID: {}]: {}", requestTargetAddress, request.getSubject(), correlationId, rejected.getError());
                            failedResult.fail(ErrorConverter.fromTransferError(rejected.getError()));
                            this.cancelRequest(correlationId, failedResult.future());
                        } else {
                            LOG.debug("service did not accept request [target address: {}, subject: {}, correlation ID: {}]", requestTargetAddress, request.getSubject(), correlationId);
                            failedResult.fail(new ClientErrorException(400));
                            this.cancelRequest(correlationId, failedResult.future());
                        }
                    } else if (remoteState instanceof Accepted) {
                        LOG.trace("service has accepted request [target address: {}, subject: {}, correlation ID: {}]", requestTargetAddress, request.getSubject(), correlationId);
                        currentSpan.log("request accepted by peer");
                        if (request.getReplyTo() == null) {
                            if (this.replyMap.remove(correlationId) != null) {
                                res.complete();
                            } else {
                                LOG.trace("accepted request won't be acted upon, request already cancelled [target address: {}, subject: {}, correlation ID: {}]", requestTargetAddress, request.getSubject(), correlationId);
                            }
                        }
                    } else if (remoteState instanceof Released) {
                        LOG.debug("service did not accept request [target address: {}, subject: {}, correlation ID: {}], remote state: {}", requestTargetAddress, request.getSubject(), correlationId, remoteState);
                        failedResult.fail(new ServerErrorException(503));
                        this.cancelRequest(correlationId, failedResult.future());
                    } else if (remoteState instanceof Modified) {
                        Modified modified = (Modified)remoteState;
                        LOG.debug("service did not accept request [target address: {}, subject: {}, correlation ID: {}], remote state: {}", requestTargetAddress, request.getSubject(), correlationId, remoteState);
                        failedResult.fail(modified.getUndeliverableHere() != false ? new ClientErrorException(404) : new ServerErrorException(503));
                        this.cancelRequest(correlationId, failedResult.future());
                    } else if (remoteState == null) {
                        String furtherInfo = !this.sender.isOpen() ? ", sender link was closed in between" : "";
                        LOG.warn("got undefined delivery state for service request{} [target address: {}, subject: {}, correlation ID: {}]", furtherInfo, requestTargetAddress, request.getSubject(), correlationId);
                        failedResult.fail(new ServerErrorException(503));
                        this.cancelRequest(correlationId, failedResult.future());
                    }
                });
                if (this.requestTimeoutMillis > 0L) {
                    this.connection.getVertx().setTimer(this.requestTimeoutMillis, tid -> {
                        if (this.cancelRequest(correlationId, () -> new ServerErrorException(503, "request timed out after " + this.requestTimeoutMillis + "ms"))) {
                            sample.timeout();
                        }
                    });
                }
                if (LOG.isDebugEnabled()) {
                    String deviceId = AmqpUtils.getDeviceId(request);
                    if (deviceId == null) {
                        LOG.debug("sent request [target address: {}, subject: {}, correlation ID: {}] to service", requestTargetAddress, request.getSubject(), correlationId);
                    } else {
                        LOG.debug("sent request [target address: {}, subject: {}, correlation ID: {}, device ID: {}] to service", requestTargetAddress, request.getSubject(), correlationId, deviceId);
                    }
                }
            }
        });
    }

    public final boolean isOpen() {
        return HonoProtonHelper.isLinkOpenAndConnected(this.sender) && HonoProtonHelper.isLinkOpenAndConnected(this.receiver);
    }

    public final Future<Void> close() {
        LOG.debug("closing request-response client ...");
        return this.closeLinks();
    }
}

