package org.sensorhub.impl.persistence;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import net.opengis.gml.v32.AbstractFeature;
import net.opengis.sensorml.v20.AbstractProcess;
import net.opengis.swe.v20.DataBlock;
import net.opengis.swe.v20.DataComponent;
import net.opengis.swe.v20.DataEncoding;
import org.sensorhub.api.common.Event;
import org.sensorhub.api.common.IEventListener;
import org.sensorhub.api.common.SensorHubException;
import org.sensorhub.api.data.DataEvent;
import org.sensorhub.api.data.FoiEvent;
import org.sensorhub.api.data.IDataProducerModule;
import org.sensorhub.api.data.IMultiSourceDataProducer;
import org.sensorhub.api.data.IStreamingDataInterface;
import org.sensorhub.api.persistence.DataKey;
import org.sensorhub.api.persistence.IBasicStorage;
import org.sensorhub.api.persistence.IDataFilter;
import org.sensorhub.api.persistence.IDataRecord;
import org.sensorhub.api.persistence.IFoiFilter;
import org.sensorhub.api.persistence.IMultiSourceStorage;
import org.sensorhub.api.persistence.IObsStorage;
import org.sensorhub.api.persistence.IRecordStorageModule;
import org.sensorhub.api.persistence.IRecordStoreInfo;
import org.sensorhub.api.persistence.IStorageModule;
import org.sensorhub.api.persistence.ObsKey;
import org.sensorhub.api.persistence.StorageConfig;
import org.sensorhub.api.persistence.StorageException;
import org.sensorhub.api.sensor.SensorEvent;
import org.sensorhub.impl.SensorHub;
import org.sensorhub.impl.module.AbstractModule;
import org.sensorhub.utils.MsgUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.vast.swe.SWEHelper;
import org.vast.swe.ScalarIndexer;
import org.vast.util.Bbox;

/* loaded from: input_file:org/sensorhub/impl/persistence/GenericStreamStorage.class */
public class GenericStreamStorage extends AbstractModule<StreamStorageConfig> implements IRecordStorageModule<StreamStorageConfig>, IObsStorage, IEventListener {
    private static final Logger log = LoggerFactory.getLogger(GenericStreamStorage.class);
    IRecordStorageModule<StorageConfig> storage;
    WeakReference<IDataProducerModule<?>> dataSourceRef;
    Map<String, ScalarIndexer> timeStampIndexers = new HashMap();
    Map<String, String> currentFoiMap = new HashMap();
    long lastCommitTime = Long.MIN_VALUE;
    String currentFoi;
    Timer autoPurgeTimer;

    @Override // org.sensorhub.api.module.IModule
    public void start() throws SensorHubException {
        StorageConfig storageConfig = null;
        try {
            storageConfig = (StorageConfig) ((StreamStorageConfig) this.config).storageConfig.m3clone();
            storageConfig.id = getLocalID();
            storageConfig.name = getName();
            this.storage = (IRecordStorageModule) Class.forName(storageConfig.moduleClass).newInstance();
            this.storage.init(storageConfig);
            this.storage.start();
            this.dataSourceRef = SensorHub.getInstance().getModuleRegistry().getModuleRef(((StreamStorageConfig) this.config).dataSourceID);
            IDataProducerModule<?> iDataProducerModule = this.dataSourceRef.get();
            if (iDataProducerModule != null) {
                if (this.storage.getLatestDataSourceDescription() == null) {
                    configureStorageForDataSource(iDataProducerModule, this.storage);
                } else if (iDataProducerModule.getLastDescriptionUpdate() != Long.MIN_VALUE) {
                    this.storage.storeDataSourceDescription(iDataProducerModule.getCurrentDescription());
                }
                if (iDataProducerModule instanceof IMultiSourceDataProducer) {
                    for (String str : ((IMultiSourceDataProducer) iDataProducerModule).getEntityIDs()) {
                        AbstractFeature currentFeatureOfInterest = ((IMultiSourceDataProducer) iDataProducerModule).getCurrentFeatureOfInterest(str);
                        if (currentFeatureOfInterest != null) {
                            this.currentFoiMap.put(str, currentFeatureOfInterest.getUniqueIdentifier());
                            if (this.storage instanceof IObsStorage) {
                                ((IObsStorage) this.storage).storeFoi(str, currentFeatureOfInterest);
                            }
                        }
                    }
                } else {
                    String uniqueIdentifier = iDataProducerModule.getCurrentDescription().getUniqueIdentifier();
                    AbstractFeature currentFeatureOfInterest2 = iDataProducerModule.getCurrentFeatureOfInterest();
                    if (currentFeatureOfInterest2 != null) {
                        this.currentFoi = currentFeatureOfInterest2.getUniqueIdentifier();
                        this.currentFoiMap.put(uniqueIdentifier, this.currentFoi);
                        if (this.storage instanceof IObsStorage) {
                            ((IObsStorage) this.storage).storeFoi(uniqueIdentifier, currentFeatureOfInterest2);
                        }
                    }
                }
                Iterator<? extends IStreamingDataInterface> it = getSelectedOutputs(iDataProducerModule).iterator();
                while (it.hasNext()) {
                    prepareToReceiveEvents(it.next());
                }
                iDataProducerModule.registerListener(this);
            } else {
                log.warn("Data source is unavailable for stream storage " + MsgUtils.moduleString(this));
            }
            if (((StreamStorageConfig) this.config).autoPurgeConfig == null || !((StreamStorageConfig) this.config).autoPurgeConfig.enabled) {
                return;
            }
            final IStorageAutoPurgePolicy policy = ((StreamStorageConfig) this.config).autoPurgeConfig.getPolicy();
            this.autoPurgeTimer = new Timer();
            this.autoPurgeTimer.schedule(new TimerTask() { // from class: org.sensorhub.impl.persistence.GenericStreamStorage.1
                @Override // java.util.TimerTask, java.lang.Runnable
                public void run() {
                    policy.trimStorage(GenericStreamStorage.this.storage);
                }
            }, 0L, (long) (((StreamStorageConfig) this.config).autoPurgeConfig.purgePeriod * 1000.0d));
        } catch (Exception e) {
            if (storageConfig != null) {
                throw new StorageException("Cannot instantiate underlying storage " + storageConfig.moduleClass, e);
            }
            throw new StorageException("Underlying storage configuration must be provided for generic storage " + ((StreamStorageConfig) this.config).name);
        }
    }

    protected void configureStorageForDataSource(IDataProducerModule<?> iDataProducerModule, IRecordStorageModule<?> iRecordStorageModule) throws StorageException {
        if (iRecordStorageModule.getRecordStores().size() > 0) {
            throw new RuntimeException("Storage " + MsgUtils.moduleString(iRecordStorageModule) + " is already configured");
        }
        iRecordStorageModule.storeDataSourceDescription(iDataProducerModule.getCurrentDescription());
        if ((iDataProducerModule instanceof IMultiSourceDataProducer) && (iRecordStorageModule instanceof IMultiSourceStorage)) {
            Iterator<String> it = ((IMultiSourceDataProducer) iDataProducerModule).getEntityIDs().iterator();
            while (it.hasNext()) {
                ensureProducerInfo(it.next());
            }
        }
        for (IStreamingDataInterface iStreamingDataInterface : getSelectedOutputs(iDataProducerModule)) {
            iRecordStorageModule.addRecordStore(iStreamingDataInterface.getName(), iStreamingDataInterface.getRecordDescription(), iStreamingDataInterface.getRecommendedEncoding());
        }
    }

    protected Collection<? extends IStreamingDataInterface> getSelectedOutputs(IDataProducerModule<?> iDataProducerModule) {
        if (((StreamStorageConfig) this.config).selectedOutputs == null || ((StreamStorageConfig) this.config).selectedOutputs.length == 0) {
            return iDataProducerModule.getAllOutputs().values();
        }
        ArrayList arrayList = new ArrayList(((StreamStorageConfig) this.config).selectedOutputs.length);
        for (String str : ((StreamStorageConfig) this.config).selectedOutputs) {
            arrayList.add(iDataProducerModule.getAllOutputs().get(str));
        }
        return arrayList;
    }

    protected void ensureProducerInfo(String str) {
        IDataProducerModule<?> iDataProducerModule;
        if (!(this.storage instanceof IMultiSourceStorage) || ((IMultiSourceStorage) this.storage).getProducerIDs().contains(str) || (iDataProducerModule = this.dataSourceRef.get()) == null || !(iDataProducerModule instanceof IMultiSourceDataProducer)) {
            return;
        }
        IBasicStorage addDataStore = ((IMultiSourceStorage) this.storage).addDataStore(str);
        AbstractProcess currentDescription = ((IMultiSourceDataProducer) iDataProducerModule).getCurrentDescription(str);
        if (currentDescription != null) {
            addDataStore.storeDataSourceDescription(currentDescription);
        }
        for (IStreamingDataInterface iStreamingDataInterface : getSelectedOutputs(iDataProducerModule)) {
            addDataStore.addRecordStore(iStreamingDataInterface.getName(), iStreamingDataInterface.getRecordDescription(), iStreamingDataInterface.getRecommendedEncoding());
        }
    }

    protected void prepareToReceiveEvents(IStreamingDataInterface iStreamingDataInterface) {
        String name = iStreamingDataInterface.getName();
        if (this.timeStampIndexers.get(name) == null) {
            this.timeStampIndexers.put(name, SWEHelper.getTimeStampIndexer(iStreamingDataInterface.getRecordDescription()));
        }
        iStreamingDataInterface.registerListener(this);
    }

    @Override // org.sensorhub.api.module.IModule
    public void stop() throws SensorHubException {
        if (this.dataSourceRef != null) {
            IDataProducerModule<?> iDataProducerModule = this.dataSourceRef.get();
            if (iDataProducerModule != null) {
                Iterator<? extends IStreamingDataInterface> it = getSelectedOutputs(iDataProducerModule).iterator();
                while (it.hasNext()) {
                    it.next().unregisterListener(this);
                }
            }
            this.dataSourceRef = null;
        }
        if (this.autoPurgeTimer != null) {
            this.autoPurgeTimer.cancel();
        }
        if (this.storage != null) {
            this.storage.stop();
        }
    }

    @Override // org.sensorhub.api.module.IModule
    public void cleanup() throws SensorHubException {
        if (this.storage != null) {
            this.storage.cleanup();
        }
    }

    @Override // org.sensorhub.api.common.IEventListener
    public void handleEvent(Event<?> event) {
        String str;
        if (((StreamStorageConfig) this.config).processEvents) {
            if (!(event instanceof DataEvent)) {
                if (event instanceof SensorEvent) {
                    if (((SensorEvent) event).getType() == SensorEvent.Type.SENSOR_CHANGED) {
                        this.storage.storeDataSourceDescription(this.dataSourceRef.get().getCurrentDescription());
                        return;
                    }
                    return;
                } else {
                    if ((event instanceof FoiEvent) && (this.storage instanceof IObsStorage)) {
                        FoiEvent foiEvent = (FoiEvent) event;
                        String relatedEntityID = ((FoiEvent) event).getRelatedEntityID();
                        if (foiEvent.getFoi() != null) {
                            if (relatedEntityID != null) {
                                ensureProducerInfo(relatedEntityID);
                            }
                            ((IObsStorage) this.storage).storeFoi(relatedEntityID, foiEvent.getFoi());
                        }
                        if (relatedEntityID != null) {
                            this.currentFoiMap.put(relatedEntityID, foiEvent.getFoiID());
                            return;
                        } else {
                            this.currentFoi = foiEvent.getFoiID();
                            return;
                        }
                    }
                    return;
                }
            }
            DataEvent dataEvent = (DataEvent) event;
            boolean isAutoCommit = this.storage.isAutoCommit();
            this.storage.setAutoCommit(false);
            String name = dataEvent.getSource().getName();
            ScalarIndexer scalarIndexer = this.timeStampIndexers.get(name);
            for (DataBlock dataBlock : dataEvent.getRecords()) {
                double doubleValue = scalarIndexer != null ? scalarIndexer.getDoubleValue(dataBlock) : event.getTimeStamp() / 1000.0d;
                String relatedEntityID2 = dataEvent.getRelatedEntityID();
                if (relatedEntityID2 != null) {
                    ensureProducerInfo(relatedEntityID2);
                    str = this.currentFoiMap.get(relatedEntityID2);
                } else {
                    str = this.currentFoi;
                }
                ObsKey obsKey = new ObsKey(name, relatedEntityID2, str, doubleValue);
                this.storage.storeRecord(obsKey, dataBlock);
                if (log.isTraceEnabled()) {
                    log.trace("Storing record " + obsKey.timeStamp + " for output " + name);
                }
            }
            long currentTimeMillis = System.currentTimeMillis();
            if (this.lastCommitTime == Long.MIN_VALUE || currentTimeMillis - this.lastCommitTime > ((StreamStorageConfig) this.config).minCommitPeriod) {
                this.storage.commit();
                this.lastCommitTime = currentTimeMillis;
            }
            this.storage.setAutoCommit(isAutoCommit);
        }
    }

    @Override // org.sensorhub.api.persistence.IBasicStorage
    public void addRecordStore(String str, DataComponent dataComponent, DataEncoding dataEncoding) {
        checkStarted();
        if (!this.storage.getRecordStores().containsKey(str)) {
            this.storage.addRecordStore(str, dataComponent, dataEncoding);
        }
        IDataProducerModule<?> iDataProducerModule = this.dataSourceRef.get();
        if (iDataProducerModule != null) {
            prepareToReceiveEvents(iDataProducerModule.getAllOutputs().get(str));
        }
    }

    @Override // org.sensorhub.api.persistence.IStorageModule
    public void backup(OutputStream outputStream) throws IOException {
        checkStarted();
        this.storage.backup(outputStream);
    }

    @Override // org.sensorhub.api.persistence.IStorageModule
    public void restore(InputStream inputStream) throws IOException {
        checkStarted();
        this.storage.restore(inputStream);
    }

    @Override // org.sensorhub.api.persistence.IStorageModule
    public void setAutoCommit(boolean z) {
        checkStarted();
        this.storage.setAutoCommit(z);
    }

    @Override // org.sensorhub.api.persistence.IStorageModule
    public boolean isAutoCommit() {
        checkStarted();
        return this.storage.isAutoCommit();
    }

    @Override // org.sensorhub.api.persistence.IStorageModule
    public void commit() {
        checkStarted();
        this.storage.commit();
    }

    @Override // org.sensorhub.api.persistence.IStorageModule
    public void rollback() {
        checkStarted();
        this.storage.rollback();
    }

    @Override // org.sensorhub.api.persistence.IStorageModule
    public void sync(IStorageModule<?> iStorageModule) throws StorageException {
        checkStarted();
        this.storage.sync(iStorageModule);
    }

    @Override // org.sensorhub.api.persistence.IBasicStorage
    public AbstractProcess getLatestDataSourceDescription() {
        checkStarted();
        return this.storage.getLatestDataSourceDescription();
    }

    @Override // org.sensorhub.api.persistence.IBasicStorage
    public List<AbstractProcess> getDataSourceDescriptionHistory(double d, double d2) {
        checkStarted();
        return this.storage.getDataSourceDescriptionHistory(d, d2);
    }

    @Override // org.sensorhub.api.persistence.IBasicStorage
    public AbstractProcess getDataSourceDescriptionAtTime(double d) {
        checkStarted();
        return this.storage.getDataSourceDescriptionAtTime(d);
    }

    @Override // org.sensorhub.api.persistence.IBasicStorage
    public void storeDataSourceDescription(AbstractProcess abstractProcess) {
        checkStarted();
        this.storage.storeDataSourceDescription(abstractProcess);
    }

    @Override // org.sensorhub.api.persistence.IBasicStorage
    public void updateDataSourceDescription(AbstractProcess abstractProcess) {
        checkStarted();
        this.storage.updateDataSourceDescription(abstractProcess);
    }

    @Override // org.sensorhub.api.persistence.IBasicStorage
    public void removeDataSourceDescription(double d) {
        checkStarted();
        this.storage.removeDataSourceDescription(d);
    }

    @Override // org.sensorhub.api.persistence.IBasicStorage
    public void removeDataSourceDescriptionHistory(double d, double d2) {
        checkStarted();
        this.storage.removeDataSourceDescriptionHistory(d, d2);
    }

    @Override // org.sensorhub.api.persistence.IBasicStorage
    public Map<String, ? extends IRecordStoreInfo> getRecordStores() {
        checkStarted();
        return this.storage.getRecordStores();
    }

    @Override // org.sensorhub.api.persistence.IBasicStorage
    public DataBlock getDataBlock(DataKey dataKey) {
        checkStarted();
        return this.storage.getDataBlock(dataKey);
    }

    @Override // org.sensorhub.api.persistence.IBasicStorage
    public Iterator<DataBlock> getDataBlockIterator(IDataFilter iDataFilter) {
        checkStarted();
        return this.storage.getDataBlockIterator(iDataFilter);
    }

    @Override // org.sensorhub.api.persistence.IBasicStorage
    public Iterator<? extends IDataRecord> getRecordIterator(IDataFilter iDataFilter) {
        checkStarted();
        return this.storage.getRecordIterator(iDataFilter);
    }

    @Override // org.sensorhub.api.persistence.IBasicStorage
    public int getNumMatchingRecords(IDataFilter iDataFilter, long j) {
        checkStarted();
        return this.storage.getNumMatchingRecords(iDataFilter, j);
    }

    @Override // org.sensorhub.api.persistence.IBasicStorage
    public int getNumRecords(String str) {
        checkStarted();
        return this.storage.getNumRecords(str);
    }

    @Override // org.sensorhub.api.persistence.IBasicStorage
    public double[] getRecordsTimeRange(String str) {
        checkStarted();
        return this.storage.getRecordsTimeRange(str);
    }

    @Override // org.sensorhub.api.persistence.IBasicStorage
    public Iterator<double[]> getRecordsTimeClusters(String str) {
        checkStarted();
        return this.storage.getRecordsTimeClusters(str);
    }

    @Override // org.sensorhub.api.persistence.IBasicStorage
    public void storeRecord(DataKey dataKey, DataBlock dataBlock) {
        checkStarted();
        this.storage.storeRecord(dataKey, dataBlock);
    }

    @Override // org.sensorhub.api.persistence.IBasicStorage
    public void updateRecord(DataKey dataKey, DataBlock dataBlock) {
        checkStarted();
        this.storage.updateRecord(dataKey, dataBlock);
    }

    @Override // org.sensorhub.api.persistence.IBasicStorage
    public void removeRecord(DataKey dataKey) {
        checkStarted();
        this.storage.removeRecord(dataKey);
    }

    @Override // org.sensorhub.api.persistence.IBasicStorage
    public int removeRecords(IDataFilter iDataFilter) {
        checkStarted();
        return this.storage.removeRecords(iDataFilter);
    }

    @Override // org.sensorhub.api.persistence.IObsStorage
    public int getNumFois(IFoiFilter iFoiFilter) {
        checkStarted();
        if (this.storage instanceof IObsStorage) {
            return ((IObsStorage) this.storage).getNumFois(iFoiFilter);
        }
        return 0;
    }

    @Override // org.sensorhub.api.persistence.IObsStorage
    public Bbox getFoisSpatialExtent() {
        checkStarted();
        if (this.storage instanceof IObsStorage) {
            return ((IObsStorage) this.storage).getFoisSpatialExtent();
        }
        return null;
    }

    @Override // org.sensorhub.api.persistence.IObsStorage
    public Iterator<String> getFoiIDs(IFoiFilter iFoiFilter) {
        checkStarted();
        return this.storage instanceof IObsStorage ? ((IObsStorage) this.storage).getFoiIDs(iFoiFilter) : Collections.EMPTY_LIST.iterator();
    }

    @Override // org.sensorhub.api.persistence.IObsStorage
    public Iterator<AbstractFeature> getFois(IFoiFilter iFoiFilter) {
        checkStarted();
        return this.storage instanceof IObsStorage ? ((IObsStorage) this.storage).getFois(iFoiFilter) : Collections.EMPTY_LIST.iterator();
    }

    @Override // org.sensorhub.api.persistence.IObsStorage
    public void storeFoi(String str, AbstractFeature abstractFeature) {
        checkStarted();
        if (this.storage instanceof IObsStorage) {
            storeFoi(str, abstractFeature);
        }
    }

    private void checkStarted() {
        if (this.storage == null) {
            throw new RuntimeException("Storage is disabled");
        }
    }
}
