package org.eclipse.hono.application.client.amqp;

import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.tag.Tags;
import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
import io.vertx.proton.ProtonDelivery;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.application.client.DownstreamMessage;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.HonoConnection;
import org.eclipse.hono.client.SendMessageSampler;
import org.eclipse.hono.client.ServiceInvocationException;
import org.eclipse.hono.client.StatusCodeMapper;
import org.eclipse.hono.client.amqp.AbstractRequestResponseServiceClient;
import org.eclipse.hono.client.amqp.RequestResponseClient;
import org.eclipse.hono.client.impl.CachingClientFactory;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.AddressHelper;
import org.eclipse.hono.util.CacheDirective;
import org.eclipse.hono.util.MessageHelper;
import org.eclipse.hono.util.RequestResponseResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:BOOT-INF/lib/hono-client-application-amqp-1.12.2.jar:org/eclipse/hono/application/client/amqp/ProtonBasedRequestResponseCommandClient.class */
public final class ProtonBasedRequestResponseCommandClient extends AbstractRequestResponseServiceClient<DownstreamMessage<AmqpMessageContext>, RequestResponseResult<DownstreamMessage<AmqpMessageContext>>> {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) ProtonBasedRequestResponseCommandClient.class);
    private static final long DEFAULT_COMMAND_TIMEOUT_IN_MS = 10000;
    private int messageCounter;

    /* JADX INFO: Access modifiers changed from: protected */
    public ProtonBasedRequestResponseCommandClient(HonoConnection honoConnection, SendMessageSampler.Factory factory) {
        super(honoConnection, factory, new CachingClientFactory(honoConnection.getVertx(), (v0) -> {
            return v0.isOpen();
        }), null);
    }

    @Override // org.eclipse.hono.client.amqp.AbstractRequestResponseServiceClient
    protected String getKey(String str) {
        return String.format("%s-%s", "command", str);
    }

    public Future<DownstreamMessage<AmqpMessageContext>> sendCommand(String str, String str2, String str3, String str4, Buffer buffer, String str5, Map<String, Object> map, Duration duration, SpanContext spanContext) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(str2);
        Objects.requireNonNull(str3);
        long longValue = ((Long) Optional.ofNullable(duration).map(duration2 -> {
            if (duration2.isNegative()) {
                throw new IllegalArgumentException("command timeout duration must be >= 0");
            }
            return Long.valueOf(duration2.toMillis());
        }).orElse(10000L)).longValue();
        Span newChildSpan = newChildSpan(spanContext, "send command and receive response");
        return getOrCreateClient(str, str5).map(requestResponseClient -> {
            requestResponseClient.setRequestTimeout(longValue);
            return requestResponseClient;
        }).compose(requestResponseClient2 -> {
            return requestResponseClient2.createAndSendRequest(str3, AddressHelper.getTargetAddress("command", str, str2, this.connection.getConfig()), (Map<String, Object>) map, buffer, str4, this::mapCommandResponse, newChildSpan);
        }).recover(th -> {
            Tags.HTTP_STATUS.set(newChildSpan, Integer.valueOf(ServiceInvocationException.extractStatusCode(th)));
            TracingHelper.logError(newChildSpan, th);
            return Future.failedFuture(th);
        }).compose(requestResponseResult -> {
            if (requestResponseResult == null) {
                return Future.failedFuture(new ClientErrorException(400));
            }
            DownstreamMessage downstreamMessage = (DownstreamMessage) requestResponseResult.getPayload();
            setTagsForResult(newChildSpan, requestResponseResult);
            if (requestResponseResult.isError()) {
                return Future.failedFuture(StatusCodeMapper.from(requestResponseResult.getStatus(), (downstreamMessage.getPayload() == null || downstreamMessage.getPayload().length() <= 0) ? null : downstreamMessage.getPayload().toString(StandardCharsets.UTF_8)));
            }
            return Future.succeededFuture(downstreamMessage);
        }).onComplete2(asyncResult -> {
            newChildSpan.finish();
        });
    }

    private RequestResponseResult<DownstreamMessage<AmqpMessageContext>> mapCommandResponse(Message message, ProtonDelivery protonDelivery) {
        ProtonBasedDownstreamMessage from = ProtonBasedDownstreamMessage.from(message, protonDelivery);
        return (RequestResponseResult) Optional.ofNullable(MessageHelper.getStatus(message)).map(num -> {
            return new RequestResponseResult(num.intValue(), from, CacheDirective.from(MessageHelper.getCacheDirective(message)), null);
        }).orElseGet(() -> {
            LOGGER.warn("response message has no status code application property [reply-to: {}, correlation ID: {}]", message.getReplyTo(), message.getCorrelationId());
            return null;
        });
    }

    @Override // org.eclipse.hono.client.amqp.AbstractRequestResponseServiceClient
    protected RequestResponseResult<DownstreamMessage<AmqpMessageContext>> getResult(int i, String str, Buffer buffer, CacheDirective cacheDirective, ApplicationProperties applicationProperties) {
        throw new UnsupportedOperationException();
    }

    private Future<RequestResponseClient<RequestResponseResult<DownstreamMessage<AmqpMessageContext>>>> getOrCreateClient(String str, String str2) {
        return this.connection.isConnected(getDefaultConnectionCheckTimeout()).compose(r8 -> {
            return this.connection.executeOnContext(promise -> {
                this.clientFactory.getOrCreateClient(getKey(str), () -> {
                    return RequestResponseClient.forEndpoint(this.connection, "command", "command_response", str, (String) Optional.ofNullable(str2).orElse(UUID.randomUUID().toString()), this::createMessageId, this.samplerFactory.create("command"), this::removeClient, this::removeClient);
                }, promise);
            });
        });
    }

    private String createMessageId() {
        int i = this.messageCounter + 1;
        this.messageCounter = i;
        return Long.toString(i, 36);
    }
}
