/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.cli.app;

import io.quarkus.runtime.Quarkus;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionException;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.eclipse.hono.application.client.ApplicationClient;
import org.eclipse.hono.application.client.DownstreamMessage;
import org.eclipse.hono.application.client.MessageContext;
import org.eclipse.hono.cli.app.NorthBoundApis;
import org.eclipse.hono.cli.util.CommandUtils;
import org.eclipse.hono.cli.util.PropertiesVersionProvider;
import picocli.CommandLine;

@Singleton
@CommandLine.Command(name="consume", description={"Consume telemetry and/or event messages from devices."}, mixinStandardHelpOptions=true, versionProvider=PropertiesVersionProvider.class, sortOptions=false)
public class TelemetryAndEvent
implements Callable<Integer> {
    private static final String MESSAGE_TYPE_TELEMETRY = "telemetry";
    private static final String MESSAGE_TYPE_EVENT = "event";
    @CommandLine.ParentCommand
    NorthBoundApis appCommand;
    @CommandLine.Option(names={"-t", "--tenant"}, description={"The tenant to consume messages for (default: ${DEFAULT-VALUE})."}, defaultValue="DEFAULT_TENANT", order=15, scope=CommandLine.ScopeType.INHERIT)
    String tenantId;
    @CommandLine.Option(names={"--telemetry"}, description={"Consume telemetry messages.", "If not specified, both telemetry and event messages will be consumed.", "Messages are printed to standard out one message per line using the following format:", "t 4711 text/plain This is the message payload {key1=value1,key2=value2,...}"}, order=20)
    boolean consumeTelemetry;
    @CommandLine.Option(names={"--event"}, description={"Consume event messages.", "If not specified, both telemetry and event messages will be consumed.", "Messages are printed to standard out one message per line using the following format:", "e 4711 text/plain This is the message payload {key1=value1,key2=value2,...}"}, order=21)
    boolean consumeEvent;
    @Inject
    Vertx vertx;
    private final Set<String> supportedMessageTypes = new HashSet<String>();

    private Future<Void> createConsumers(ApplicationClient<? extends MessageContext> client) {
        Handler<Throwable> closeHandler = cause -> {
            System.err.println("peer has closed message consumer(s) unexpectedly, trying to reopen ...");
            this.vertx.setTimer(1000L, reconnect -> this.createConsumers(client));
        };
        ArrayList<Future> consumerFutures = new ArrayList<Future>();
        if (this.supportedMessageTypes.contains(MESSAGE_TYPE_EVENT)) {
            consumerFutures.add(client.createEventConsumer(this.tenantId, msg -> this.printMessage(MESSAGE_TYPE_EVENT, (DownstreamMessage<? extends MessageContext>)msg), closeHandler));
        }
        if (this.supportedMessageTypes.contains(MESSAGE_TYPE_TELEMETRY)) {
            consumerFutures.add(client.createTelemetryConsumer(this.tenantId, msg -> this.printMessage(MESSAGE_TYPE_TELEMETRY, (DownstreamMessage<? extends MessageContext>)msg), closeHandler));
        }
        return CompositeFuture.all(consumerFutures).mapEmpty();
    }

    private void printMessage(String endpoint, DownstreamMessage<? extends MessageContext> message) {
        System.out.println("%s %s %s %s %s".formatted(Character.valueOf(endpoint.charAt(0)), message.getDeviceId(), Optional.ofNullable(message.getContentType()).orElse("-"), Optional.ofNullable(message.getPayload()).map(Buffer::toString).orElse("-"), message.getProperties().getPropertiesMap()));
    }

    @Override
    public Integer call() {
        if (this.consumeEvent) {
            this.supportedMessageTypes.add(MESSAGE_TYPE_EVENT);
        }
        if (this.consumeTelemetry) {
            this.supportedMessageTypes.add(MESSAGE_TYPE_TELEMETRY);
        }
        if (this.supportedMessageTypes.isEmpty()) {
            this.supportedMessageTypes.add(MESSAGE_TYPE_EVENT);
            this.supportedMessageTypes.add(MESSAGE_TYPE_TELEMETRY);
        }
        try {
            this.appCommand.getApplicationClient().compose(this::createConsumers).onSuccess(ok -> System.err.println("Consuming messages for tenant [%s], ctrl-c to exit.\n".formatted(this.tenantId))).toCompletionStage().toCompletableFuture().join();
            Quarkus.waitForExit();
            return 0;
        }
        catch (CompletionException e) {
            CommandUtils.printError(e.getCause());
            System.err.println("failed to create message consumer(s): %s".formatted(e.getMessage()));
            return 1;
        }
    }
}

