/*
 * Decompiled with CFR 0.152.
 */
package org.apache.sling.distribution.journal.bookkeeper;

import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.api.resource.ValueMap;
import org.apache.sling.commons.metrics.Timer;
import org.apache.sling.distribution.ImportPostProcessException;
import org.apache.sling.distribution.ImportPostProcessor;
import org.apache.sling.distribution.ImportPreProcessException;
import org.apache.sling.distribution.ImportPreProcessor;
import org.apache.sling.distribution.InvalidationProcessException;
import org.apache.sling.distribution.InvalidationProcessor;
import org.apache.sling.distribution.common.DistributionException;
import org.apache.sling.distribution.journal.bookkeeper.AppliedEvent;
import org.apache.sling.distribution.journal.bookkeeper.BookKeeperConfig;
import org.apache.sling.distribution.journal.bookkeeper.LocalStore;
import org.apache.sling.distribution.journal.bookkeeper.PackageHandler;
import org.apache.sling.distribution.journal.bookkeeper.PackageRetries;
import org.apache.sling.distribution.journal.bookkeeper.SubscriberMetrics;
import org.apache.sling.distribution.journal.messages.LogMessage;
import org.apache.sling.distribution.journal.messages.PackageMessage;
import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BookKeeper {
    public static final String STORE_TYPE_STATUS = "statuses";
    public static final String KEY_OFFSET = "offset";
    public static final int COMMIT_AFTER_NUM_SKIPPED = 10;
    private static final String SUBSERVICE_IMPORTER = "importer";
    private static final String SUBSERVICE_BOOKKEEPER = "bookkeeper";
    private static final int RETRY_SEND_DELAY = 1000;
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    private final ResourceResolverFactory resolverFactory;
    private final SubscriberMetrics subscriberMetrics;
    private final PackageHandler packageHandler;
    private final EventAdmin eventAdmin;
    private final Consumer<PackageStatusMessage> sender;
    private final Consumer<LogMessage> logSender;
    private final BookKeeperConfig config;
    private final boolean errorQueueEnabled;
    private final PackageRetries packageRetries = new PackageRetries();
    private final LocalStore statusStore;
    private final LocalStore processedOffsets;
    private final ImportPreProcessor importPreProcessor;
    private final ImportPostProcessor importPostProcessor;
    private final InvalidationProcessor invalidationProcessor;
    private int skippedCounter = 0;

    public BookKeeper(ResourceResolverFactory resolverFactory, SubscriberMetrics subscriberMetrics, PackageHandler packageHandler, EventAdmin eventAdmin, Consumer<PackageStatusMessage> sender, Consumer<LogMessage> logSender, BookKeeperConfig config, ImportPreProcessor importPreProcessor, ImportPostProcessor importPostProcessor, InvalidationProcessor invalidationProcessor) {
        this.packageHandler = packageHandler;
        this.eventAdmin = eventAdmin;
        this.sender = sender;
        this.logSender = logSender;
        this.config = config;
        subscriberMetrics.currentRetries(this.packageRetries::getSum);
        this.resolverFactory = resolverFactory;
        this.subscriberMetrics = subscriberMetrics;
        this.errorQueueEnabled = config.getMaxRetries() >= 0;
        this.statusStore = new LocalStore(resolverFactory, STORE_TYPE_STATUS, config.getSubAgentName());
        this.processedOffsets = new LocalStore(resolverFactory, config.getPackageNodeName(), config.getSubAgentName());
        this.importPreProcessor = importPreProcessor;
        this.importPostProcessor = importPostProcessor;
        this.invalidationProcessor = invalidationProcessor;
        this.log.info("Started bookkeeper {}.", (Object)config);
    }

    public void importPackage(PackageMessage pkgMsg, long offset, long createdTime) throws DistributionException {
        this.log.debug("Importing distribution package {} at offset={}", (Object)pkgMsg, (Object)offset);
        try (Timer.Context context = this.subscriberMetrics.getImportedPackageDuration().time();
             ResourceResolver importerResolver = this.getServiceResolver(SUBSERVICE_IMPORTER);){
            this.preProcess(pkgMsg);
            this.packageHandler.apply(importerResolver, pkgMsg);
            if (this.config.isEditable()) {
                this.storeStatus(importerResolver, new PackageStatus(PackageStatusMessage.Status.IMPORTED, offset, pkgMsg.getPubAgentName()));
            }
            this.storeOffset(importerResolver, offset);
            importerResolver.commit();
            this.subscriberMetrics.getImportedPackageSize().update(pkgMsg.getPkgLength());
            this.subscriberMetrics.getPackageDistributedDuration().update(System.currentTimeMillis() - createdTime, TimeUnit.MILLISECONDS);
            this.postProcess(pkgMsg);
            this.clearPackageRetriesOnSuccess(pkgMsg);
            Event event = new AppliedEvent(pkgMsg, this.config.getSubAgentName()).toEvent();
            this.eventAdmin.postEvent(event);
            this.log.info("Imported distribution package {} at offset={}", (Object)pkgMsg, (Object)offset);
            this.subscriberMetrics.getPackageStatusCounter(PackageStatusMessage.Status.IMPORTED).increment();
        }
        catch (IOException | RuntimeException | LoginException | ImportPostProcessException | ImportPreProcessException | DistributionException e) {
            this.failure(pkgMsg, offset, (Exception)e);
        }
    }

    public void invalidateCache(PackageMessage pkgMsg, long offset) throws DistributionException {
        this.log.debug("Invalidating the cache for the package {} at offset={}", (Object)pkgMsg, (Object)offset);
        try (ResourceResolver resolver = this.getServiceResolver(SUBSERVICE_BOOKKEEPER);){
            Map<String, Object> props = this.buildProcessorPropertiesFromMessage(pkgMsg);
            long invalidationStartTime = System.currentTimeMillis();
            this.subscriberMetrics.getInvalidationProcessRequest().increment();
            this.invalidationProcessor.process(props);
            if (this.config.isEditable()) {
                this.storeStatus(resolver, new PackageStatus(PackageStatusMessage.Status.IMPORTED, offset, pkgMsg.getPubAgentName()));
            }
            this.storeOffset(resolver, offset);
            resolver.commit();
            this.clearPackageRetriesOnSuccess(pkgMsg);
            Event event = new AppliedEvent(pkgMsg, this.config.getSubAgentName()).toEvent();
            this.eventAdmin.postEvent(event);
            this.log.info("Invalidated the cache for the package {} at offset={}", (Object)pkgMsg, (Object)offset);
            this.subscriberMetrics.getPackageStatusCounter(PackageStatusMessage.Status.IMPORTED).increment();
            this.subscriberMetrics.getInvalidationProcessDuration().update(System.currentTimeMillis() - invalidationStartTime, TimeUnit.MILLISECONDS);
            this.subscriberMetrics.getInvalidationProcessSuccess().increment();
        }
        catch (RuntimeException | LoginException | PersistenceException | InvalidationProcessException e) {
            this.failure(pkgMsg, offset, (Exception)e);
        }
    }

    private void preProcess(PackageMessage packageMessage) throws ImportPreProcessException {
        this.log.debug("Executing import pre processor for package [{}]", (Object)packageMessage);
        Map<String, Object> processorProperties = this.buildProcessorPropertiesFromMessage(packageMessage);
        long preProcessStartTime = System.currentTimeMillis();
        this.subscriberMetrics.getImportPreProcessRequest().increment();
        this.importPreProcessor.process(processorProperties);
        this.log.debug("Executed import pre processor for package [{}]", (Object)packageMessage.getPkgId());
        this.subscriberMetrics.getImportPreProcessDuration().update(System.currentTimeMillis() - preProcessStartTime, TimeUnit.MILLISECONDS);
        this.subscriberMetrics.getImportPreProcessSuccess().increment();
    }

    private void postProcess(PackageMessage pkgMsg) throws ImportPostProcessException {
        this.log.debug("Executing import post processor for package [{}]", (Object)pkgMsg);
        Map<String, Object> props = this.buildProcessorPropertiesFromMessage(pkgMsg);
        long postProcessStartTime = System.currentTimeMillis();
        this.subscriberMetrics.getImportPostProcessRequest().increment();
        this.importPostProcessor.process(props);
        this.log.debug("Executed import post processor for package [{}]", (Object)pkgMsg.getPkgId());
        this.subscriberMetrics.getImportPostProcessDuration().update(System.currentTimeMillis() - postProcessStartTime, TimeUnit.MILLISECONDS);
        this.subscriberMetrics.getImportPostProcessSuccess().increment();
    }

    private void failure(PackageMessage pkgMsg, long offset, Exception e) throws DistributionException {
        this.subscriberMetrics.getFailedPackageImports().mark();
        String pubAgentName = pkgMsg.getPubAgentName();
        int retries = this.packageRetries.get(pubAgentName);
        boolean giveUp = this.errorQueueEnabled && retries >= this.config.getMaxRetries();
        String retriesSt = this.errorQueueEnabled ? Integer.toString(this.config.getMaxRetries()) : "infinite";
        String action = giveUp ? "skip the package" : "retry later";
        String msg = String.format("Failed attempt (%s/%s) to import the distribution package %s at offset=%d because of '%s', the importer will %s", retries, retriesSt, pkgMsg.toString(false), offset, e.getMessage(), action);
        try {
            LogMessage logMessage = this.getLogMessage(pubAgentName, msg, e);
            this.logSender.accept(logMessage);
        }
        catch (Exception e2) {
            this.log.warn("Error sending log message", (Throwable)e2);
        }
        if (!giveUp) {
            this.packageRetries.increase(pubAgentName);
            throw new DistributionException(msg, (Throwable)e);
        }
        this.log.warn(msg, (Throwable)e);
        this.removeFailedPackage(pkgMsg, offset);
        this.subscriberMetrics.getPermanentImportErrors().increment();
    }

    private LogMessage getLogMessage(String pubAgentName, String msg, Exception e) {
        StringWriter sw = new StringWriter();
        PrintWriter pw = new PrintWriter(sw);
        e.printStackTrace(pw);
        return LogMessage.builder().pubAgentName(pubAgentName).subSlingId(this.config.getSubSlingId()).subAgentName(this.config.getSubAgentName()).message(msg).stacktrace(sw.getBuffer().toString()).build();
    }

    public void removePackage(PackageMessage pkgMsg, long offset) throws LoginException, PersistenceException {
        this.log.info("Removing distribution package {} of type {} at offset {}", new Object[]{pkgMsg.getPkgId(), pkgMsg.getReqType(), offset});
        Timer.Context context = this.subscriberMetrics.getRemovedPackageDuration().time();
        try (ResourceResolver resolver = this.getServiceResolver(SUBSERVICE_BOOKKEEPER);){
            if (this.config.isEditable()) {
                this.storeStatus(resolver, new PackageStatus(PackageStatusMessage.Status.REMOVED, offset, pkgMsg.getPubAgentName()));
            }
            this.storeOffset(resolver, offset);
            resolver.commit();
        }
        this.packageRetries.clear(pkgMsg.getPubAgentName());
        context.stop();
        this.subscriberMetrics.getPackageStatusCounter(PackageStatusMessage.Status.REMOVED).increment();
    }

    public void skipPackage(long offset) throws LoginException, PersistenceException {
        this.log.info("Skipping package at offset={}", (Object)offset);
        if (this.shouldCommitSkipped()) {
            try (ResourceResolver resolver = this.getServiceResolver(SUBSERVICE_BOOKKEEPER);){
                this.storeOffset(resolver, offset);
                resolver.commit();
            }
        }
    }

    public synchronized boolean shouldCommitSkipped() {
        ++this.skippedCounter;
        if (this.skippedCounter > 10) {
            this.skippedCounter = 1;
            return true;
        }
        return false;
    }

    public boolean sendStoredStatus(int retry) {
        PackageStatus status = new PackageStatus(this.statusStore.load());
        return status.sent != false || this.sendStoredStatus(status, retry);
    }

    private boolean sendStoredStatus(PackageStatus status, int retry) {
        try {
            this.sendStatusMessage(status);
            this.markStatusSent();
            return true;
        }
        catch (Exception e) {
            this.log.warn("Cannot send status (retry {})", (Object)retry, (Object)e);
            BookKeeper.retryDelay();
            return false;
        }
    }

    private void sendStatusMessage(PackageStatus status) {
        PackageStatusMessage pkgStatMsg = PackageStatusMessage.builder().subSlingId(this.config.getSubSlingId()).subAgentName(this.config.getSubAgentName()).pubAgentName(status.pubAgentName).offset(status.offset.longValue()).status(status.status).build();
        this.sender.accept(pkgStatMsg);
        this.log.info("Sent status message {}", (Object)pkgStatMsg);
    }

    public void markStatusSent() {
        try (ResourceResolver resolver = this.getServiceResolver(SUBSERVICE_BOOKKEEPER);){
            this.statusStore.store(resolver, "sent", true);
            resolver.commit();
        }
        catch (Exception e) {
            this.log.warn("Failed to mark status as sent", (Throwable)e);
        }
    }

    public long loadOffset() {
        return this.processedOffsets.load(KEY_OFFSET, -1L);
    }

    public int getRetries(String pubAgentName) {
        return this.packageRetries.get(pubAgentName);
    }

    public PackageRetries getPackageRetries() {
        return this.packageRetries;
    }

    public void clearPackageRetriesOnSuccess(PackageMessage pkgMsg) {
        String pubAgentName = pkgMsg.getPubAgentName();
        if (this.packageRetries.get(pubAgentName) > 0) {
            this.subscriberMetrics.getTransientImportErrors().increment();
        }
        this.packageRetries.clear(pubAgentName);
    }

    public void handleInitialOffset(long offset) {
        try (ResourceResolver resolver = this.getServiceResolver(SUBSERVICE_BOOKKEEPER);){
            long currentOffset = this.loadOffset();
            if (currentOffset == -1L) {
                this.log.info("Storing initial offset. packageNodeName={}, subagentName={}, offset={}", new Object[]{this.config.getPackageNodeName(), this.config.getSubAgentName(), offset});
                this.storeOffset(resolver, offset);
                resolver.commit();
            }
        }
        catch (Exception e) {
            this.log.warn("Error storing initial offset={}", (Object)offset, (Object)e);
        }
    }

    private void removeFailedPackage(PackageMessage pkgMsg, long offset) throws DistributionException {
        this.log.info("Removing failed distribution package {} at offset={}", (Object)pkgMsg, (Object)offset);
        Timer.Context context = this.subscriberMetrics.getRemovedFailedPackageDuration().time();
        try (ResourceResolver resolver = this.getServiceResolver(SUBSERVICE_BOOKKEEPER);){
            this.storeStatus(resolver, new PackageStatus(PackageStatusMessage.Status.REMOVED_FAILED, offset, pkgMsg.getPubAgentName()));
            this.storeOffset(resolver, offset);
            resolver.commit();
        }
        catch (Exception e) {
            throw new DistributionException("Error removing failed package", (Throwable)e);
        }
        context.stop();
        this.subscriberMetrics.getPackageStatusCounter(PackageStatusMessage.Status.REMOVED_FAILED).increment();
    }

    private void storeStatus(ResourceResolver resolver, PackageStatus packageStatus) throws PersistenceException {
        Map<String, Object> statusMap = packageStatus.asMap();
        this.statusStore.store(resolver, statusMap);
        this.log.info("Stored status {}", statusMap);
    }

    private void storeOffset(ResourceResolver resolver, long offset) throws PersistenceException {
        this.processedOffsets.store(resolver, KEY_OFFSET, offset);
    }

    private ResourceResolver getServiceResolver(String subService) throws LoginException {
        return this.resolverFactory.getServiceResourceResolver(Collections.singletonMap("sling.service.subservice", subService));
    }

    private Map<String, Object> buildProcessorPropertiesFromMessage(PackageMessage packageMessage) {
        HashMap<String, Object> processorProperties = new HashMap<String, Object>();
        processorProperties.put("distribution.type", packageMessage.getReqType().name());
        processorProperties.put("distribution.paths", packageMessage.getPaths());
        processorProperties.put("distribution.package.id", packageMessage.getPkgId());
        return processorProperties;
    }

    static void retryDelay() {
        try {
            Thread.sleep(1000L);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public static class PackageStatus {
        public final PackageStatusMessage.Status status;
        final Long offset;
        final String pubAgentName;
        final Boolean sent;

        PackageStatus(PackageStatusMessage.Status status, long offset, String pubAgentName) {
            this.status = status;
            this.offset = offset;
            this.pubAgentName = pubAgentName;
            this.sent = false;
        }

        public PackageStatus(ValueMap statusMap) {
            Integer statusNum = (Integer)statusMap.get("statusNumber", Integer.class);
            this.status = statusNum != null ? PackageStatusMessage.Status.fromNumber((int)statusNum) : null;
            this.offset = (Long)statusMap.get(BookKeeper.KEY_OFFSET, Long.class);
            this.pubAgentName = (String)statusMap.get("pubAgentName", String.class);
            this.sent = (Boolean)statusMap.get("sent", (Object)true);
        }

        Map<String, Object> asMap() {
            HashMap<String, Object> s = new HashMap<String, Object>();
            s.put("pubAgentName", this.pubAgentName);
            s.put("statusNumber", this.status.getNumber());
            s.put(BookKeeper.KEY_OFFSET, this.offset);
            s.put("sent", this.sent);
            return s;
        }
    }
}

