package org.eclipse.hono.application.client.amqp;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.impl.NoStackTraceThrowable;
import io.vertx.proton.ProtonDelivery;
import io.vertx.proton.ProtonHelper;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.application.client.DownstreamMessage;
import org.eclipse.hono.application.client.MessageConsumer;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.HonoConnection;
import org.eclipse.hono.client.SendMessageSampler;
import org.eclipse.hono.client.amqp.GenericReceiverLink;
import org.eclipse.hono.util.Constants;
import org.eclipse.hono.util.TelemetryConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/hono-client-application-amqp-1.12.2.jar:org/eclipse/hono/application/client/amqp/ProtonBasedApplicationClient.class */
public class ProtonBasedApplicationClient extends ProtonBasedCommandSender implements AmqpApplicationClient {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) ProtonBasedApplicationClient.class);
    private final List<Handler<Throwable>> consumerCloseHandlers;

    public ProtonBasedApplicationClient(HonoConnection honoConnection) {
        super(honoConnection, SendMessageSampler.Factory.noop());
        this.consumerCloseHandlers = new ArrayList();
    }

    private String getTenantScopedLinkAddress(String str, String str2) {
        return String.format("%s/%s", str, str2);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.eclipse.hono.client.amqp.SenderCachingServiceClient, org.eclipse.hono.client.amqp.AbstractServiceClient
    public void onDisconnect() {
        if (!this.connection.isShutdown()) {
            NoStackTraceThrowable noStackTraceThrowable = new NoStackTraceThrowable("disconnected");
            this.consumerCloseHandlers.forEach(handler -> {
                handler.handle(noStackTraceThrowable);
            });
            this.consumerCloseHandlers.clear();
        }
        super.onDisconnect();
    }

    @Override // org.eclipse.hono.client.amqp.AbstractServiceClient, org.eclipse.hono.client.ConnectionLifecycle
    public final Future<HonoConnection> connect() {
        LOG.info("connecting to Hono endpoint");
        return this.connection.connect();
    }

    @Override // org.eclipse.hono.client.amqp.AbstractServiceClient, org.eclipse.hono.client.ConnectionLifecycle
    public final void disconnect() {
        disconnect(Promise.promise());
    }

    @Override // org.eclipse.hono.client.amqp.AbstractServiceClient, org.eclipse.hono.client.ConnectionLifecycle
    public final void disconnect(Handler<AsyncResult<Void>> handler) {
        LOG.info("disconnecting from Hono endpoint");
        this.consumerCloseHandlers.clear();
        this.connection.disconnect(handler);
    }

    @Override // org.eclipse.hono.application.client.ApplicationClient
    public final Future<MessageConsumer> createTelemetryConsumer(String str, Handler<DownstreamMessage<AmqpMessageContext>> handler, Handler<Throwable> handler2) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(handler);
        return createConsumer(getTenantScopedLinkAddress(TelemetryConstants.TELEMETRY_ENDPOINT, str), handler, handler2);
    }

    @Override // org.eclipse.hono.application.client.ApplicationClient
    public final Future<MessageConsumer> createEventConsumer(String str, Handler<DownstreamMessage<AmqpMessageContext>> handler, Handler<Throwable> handler2) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(handler);
        return createConsumer(getTenantScopedLinkAddress("event", str), handler, handler2);
    }

    @Override // org.eclipse.hono.application.client.ApplicationClient
    public Future<MessageConsumer> createCommandResponseConsumer(String str, String str2, Handler<DownstreamMessage<AmqpMessageContext>> handler, Handler<Throwable> handler2) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(str2);
        Objects.requireNonNull(handler);
        return createConsumer(String.format("%s/%s/%s", "command_response", str, str2), handler, handler2);
    }

    @Override // org.eclipse.hono.application.client.amqp.AmqpApplicationClient
    public final Future<MessageConsumer> createTelemetryConsumer(String str, Function<DownstreamMessage<AmqpMessageContext>, Future<Void>> function, Handler<Throwable> handler) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(function);
        return createAsyncConsumer(getTenantScopedLinkAddress(TelemetryConstants.TELEMETRY_ENDPOINT, str), function, handler);
    }

    @Override // org.eclipse.hono.application.client.amqp.AmqpApplicationClient
    public final Future<MessageConsumer> createEventConsumer(String str, Function<DownstreamMessage<AmqpMessageContext>, Future<Void>> function, Handler<Throwable> handler) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(function);
        return createAsyncConsumer(getTenantScopedLinkAddress("event", str), function, handler);
    }

    private Future<MessageConsumer> createConsumer(String str, Handler<DownstreamMessage<AmqpMessageContext>> handler, Handler<Throwable> handler2) {
        return createConsumer(str, (protonDelivery, message) -> {
            try {
                handler.handle(ProtonBasedDownstreamMessage.from(message, protonDelivery));
                if (!protonDelivery.isSettled()) {
                    LOG.debug("client provided message handler did not settle message, auto-accepting ...");
                    ProtonHelper.accepted(protonDelivery, true);
                }
            } catch (Throwable th) {
                handleMessageHandlerError(th, protonDelivery);
            }
        }, handler2);
    }

    private Future<MessageConsumer> createConsumer(String str, BiConsumer<ProtonDelivery, Message> biConsumer, Handler<Throwable> handler) {
        Handler handler2 = handler != null ? th -> {
            handler.handle(th);
        } : null;
        return this.connection.isConnected(getDefaultConnectionCheckTimeout()).compose(r13 -> {
            return GenericReceiverLink.create(this.connection, str, biConsumer, false, str2 -> {
                if (handler != null) {
                    this.consumerCloseHandlers.remove(handler2);
                    handler.handle(null);
                }
            });
        }).onSuccess2(genericReceiverLink -> {
            Optional ofNullable = Optional.ofNullable(handler2);
            List<Handler<Throwable>> list = this.consumerCloseHandlers;
            Objects.requireNonNull(list);
            ofNullable.ifPresent((v1) -> {
                r1.add(v1);
            });
        }).map(genericReceiverLink2 -> {
            return new MessageConsumer() { // from class: org.eclipse.hono.application.client.amqp.ProtonBasedApplicationClient.1
                @Override // org.eclipse.hono.application.client.MessageConsumer
                public Future<Void> close() {
                    Optional ofNullable = Optional.ofNullable(handler2);
                    List<Handler<Throwable>> list = ProtonBasedApplicationClient.this.consumerCloseHandlers;
                    Objects.requireNonNull(list);
                    ofNullable.ifPresent((v1) -> {
                        r1.remove(v1);
                    });
                    return genericReceiverLink2.close();
                }
            };
        });
    }

    private Future<MessageConsumer> createAsyncConsumer(String str, Function<DownstreamMessage<AmqpMessageContext>, Future<Void>> function, Handler<Throwable> handler) {
        return createConsumer(str, (protonDelivery, message) -> {
            try {
                ((Future) function.apply(ProtonBasedDownstreamMessage.from(message, protonDelivery))).onSuccess2(r4 -> {
                    if (protonDelivery.isSettled()) {
                        return;
                    }
                    LOG.debug("client provided message handler did not settle message, auto-accepting ...");
                    ProtonHelper.accepted(protonDelivery, true);
                }).onFailure(th -> {
                    handleMessageHandlerError(th, protonDelivery);
                });
            } catch (Throwable th2) {
                handleMessageHandlerError(th2, protonDelivery);
            }
        }, handler);
    }

    private void handleMessageHandlerError(Throwable th, ProtonDelivery protonDelivery) {
        LOG.debug("client provided message handler threw exception [local state: {}, settled: {}]", Optional.ofNullable(protonDelivery.getLocalState()).map(deliveryState -> {
            return deliveryState.getType().name();
        }).orElse(null), Boolean.valueOf(protonDelivery.isSettled()), th);
        if (protonDelivery.isSettled()) {
            return;
        }
        DeliveryState deliveryState2 = getDeliveryState(th);
        LOG.debug("settling transfer [local state: {}]", deliveryState2.getType().name());
        protonDelivery.disposition(deliveryState2, true);
    }

    private DeliveryState getDeliveryState(Throwable th) {
        if (!(th instanceof ClientErrorException)) {
            return new Released();
        }
        Rejected rejected = new Rejected();
        rejected.setError(ProtonHelper.condition(Constants.AMQP_BAD_REQUEST, th.getMessage()));
        return rejected;
    }
}
