package org.sensorhub.impl.processing;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.WeakHashMap;
import net.opengis.gml.v32.AbstractFeature;
import net.opengis.sensorml.v20.AbstractProcess;
import net.opengis.swe.v20.DataBlock;
import net.opengis.swe.v20.DataComponent;
import org.sensorhub.api.common.Event;
import org.sensorhub.api.common.SensorHubException;
import org.sensorhub.api.data.DataEvent;
import org.sensorhub.api.data.IDataProducerModule;
import org.sensorhub.api.data.IStreamingDataInterface;
import org.sensorhub.api.processing.DataSourceConfig;
import org.sensorhub.api.processing.IStreamProcessModule;
import org.sensorhub.api.processing.ProcessException;
import org.sensorhub.api.processing.StorageDataSourceConfig;
import org.sensorhub.api.processing.StreamProcessConfig;
import org.sensorhub.api.processing.StreamingDataSourceConfig;
import org.sensorhub.impl.SensorHub;
import org.sensorhub.impl.module.AbstractModule;
import org.sensorhub.impl.module.ModuleRegistry;
import org.sensorhub.utils.MsgUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.vast.process.DataQueue;
import org.vast.sensorML.SMLFactory;
import org.vast.swe.SWEHelper;

/* loaded from: input_file:org/sensorhub/impl/processing/AbstractStreamProcess.class */
public abstract class AbstractStreamProcess<ConfigType extends StreamProcessConfig> extends AbstractModule<ConfigType> implements IStreamProcessModule<ConfigType> {
    public static final String DEFAULT_ID = "PROCESS_DESC";
    private static final Logger log = LoggerFactory.getLogger(AbstractStreamProcess.class);
    protected static final int MAX_ERRORS = 10;
    protected Map<IStreamingDataInterface, AbstractStreamProcess<ConfigType>.InputData> streamSources;
    protected AbstractProcess processDescription;
    protected Map<String, DataComponent> inputs = new LinkedHashMap();
    protected Map<String, DataComponent> outputs = new LinkedHashMap();
    protected Map<String, DataComponent> parameters = new LinkedHashMap();
    protected Map<String, IStreamingDataInterface> outputInterfaces = new LinkedHashMap();
    protected long lastUpdatedSensorDescription = Long.MIN_VALUE;
    protected boolean paused = false;
    protected int errorCount = 0;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/sensorhub/impl/processing/AbstractStreamProcess$InputData.class */
    public class InputData {
        private DataComponent srcDataBuffer;
        private List<DataQueue> dataQueues = new ArrayList();

        protected InputData() {
        }

        public List<DataQueue> getDataQueues() {
            return this.dataQueues;
        }
    }

    protected abstract void process(DataEvent dataEvent) throws ProcessException;

    protected void addOutput(IStreamingDataInterface iStreamingDataInterface) {
        String name = iStreamingDataInterface.getName();
        this.outputs.put(name, iStreamingDataInterface.getRecordDescription());
        this.outputInterfaces.put(name, iStreamingDataInterface);
    }

    @Override // org.sensorhub.api.processing.IProcessModule
    public Map<String, DataComponent> getInputDescriptors() {
        return Collections.unmodifiableMap(this.inputs);
    }

    @Override // org.sensorhub.api.processing.IProcessModule
    public Map<String, DataComponent> getOutputDescriptors() {
        return Collections.unmodifiableMap(this.outputs);
    }

    @Override // org.sensorhub.api.processing.IStreamProcessModule
    public Map<String, DataComponent> getParameters() {
        return Collections.unmodifiableMap(this.parameters);
    }

    @Override // org.sensorhub.api.data.IDataProducerModule
    public Map<String, IStreamingDataInterface> getAllOutputs() {
        return Collections.unmodifiableMap(this.outputInterfaces);
    }

    @Override // org.sensorhub.api.data.IDataProducerModule
    public AbstractFeature getCurrentFeatureOfInterest() {
        return null;
    }

    @Override // org.sensorhub.api.data.IModuleWithDescription
    public AbstractProcess getCurrentDescription() {
        if (this.processDescription == null) {
            this.processDescription = new SMLFactory().newSimpleProcess();
            this.processDescription.setId(DEFAULT_ID);
            this.processDescription.setUniqueIdentifier(getLocalID());
            this.processDescription.setName(getName());
            for (Map.Entry<String, DataComponent> entry : getInputDescriptors().entrySet()) {
                this.processDescription.addInput(entry.getKey(), entry.getValue());
            }
            for (Map.Entry<String, DataComponent> entry2 : getOutputDescriptors().entrySet()) {
                this.processDescription.addOutput(entry2.getKey(), entry2.getValue());
            }
            for (Map.Entry<String, DataComponent> entry3 : getParameters().entrySet()) {
                this.processDescription.addParameter(entry3.getKey(), entry3.getValue());
            }
        }
        return this.processDescription;
    }

    @Override // org.sensorhub.api.data.IModuleWithDescription
    public long getLastDescriptionUpdate() {
        return this.lastUpdatedSensorDescription;
    }

    protected void connectInput(String str, String str2, DataQueue dataQueue) throws Exception {
        DataComponent dataComponent = this.inputs.get(str);
        if (dataComponent == null) {
            throw new ProcessException("Input " + str + " doesn't exist");
        }
        dataQueue.setDestinationComponent(SWEHelper.findComponentByPath(dataComponent, str2));
    }

    @Override // org.sensorhub.api.module.IModule
    public void start() throws SensorHubException {
        this.errorCount = 0;
        this.streamSources = new WeakHashMap();
        for (DataSourceConfig dataSourceConfig : ((StreamProcessConfig) this.config).dataSources) {
            ModuleRegistry moduleRegistry = SensorHub.getInstance().getModuleRegistry();
            if (!isCompatibleDataSource(dataSourceConfig)) {
                throw new ProcessException("Data source is not supported");
            }
            if (dataSourceConfig instanceof StreamingDataSourceConfig) {
                StreamingDataSourceConfig streamingDataSourceConfig = (StreamingDataSourceConfig) dataSourceConfig;
                IDataProducerModule iDataProducerModule = (IDataProducerModule) moduleRegistry.getModuleById(streamingDataSourceConfig.producerID);
                for (DataSourceConfig.InputLinkConfig inputLinkConfig : streamingDataSourceConfig.inputConnections) {
                    DataQueue dataQueue = new DataQueue();
                    try {
                        int indexOf = inputLinkConfig.source.indexOf(47);
                        String substring = indexOf < 0 ? inputLinkConfig.source : inputLinkConfig.source.substring(0, indexOf);
                        String substring2 = indexOf < 0 ? "/" : inputLinkConfig.source.substring(indexOf + 1);
                        IStreamingDataInterface iStreamingDataInterface = iDataProducerModule.getAllOutputs().get(substring);
                        if (iStreamingDataInterface == null) {
                            throw new ProcessException("Output " + substring + " doesn't exist");
                        }
                        AbstractStreamProcess<ConfigType>.InputData inputData = this.streamSources.get(iStreamingDataInterface);
                        if (inputData == null) {
                            inputData = new InputData();
                            ((InputData) inputData).srcDataBuffer = iStreamingDataInterface.getRecordDescription().copy();
                            this.streamSources.put(iStreamingDataInterface, inputData);
                        }
                        dataQueue.setSourceComponent(SWEHelper.findComponentByPath(((InputData) inputData).srcDataBuffer, substring2));
                        try {
                            if (inputLinkConfig.destination.equals(DataSourceConfig.AUTO_CREATE)) {
                                DataComponent copy = dataQueue.getSourceComponent().copy();
                                this.inputs.put(copy.getName(), copy);
                                dataQueue.setDestinationComponent(copy);
                            } else {
                                int indexOf2 = inputLinkConfig.destination.indexOf(47);
                                connectInput(indexOf2 < 0 ? inputLinkConfig.destination : inputLinkConfig.destination.substring(0, indexOf2), indexOf2 < 0 ? "/" : inputLinkConfig.destination.substring(indexOf2 + 1), dataQueue);
                            }
                            ((InputData) inputData).dataQueues.add(dataQueue);
                            iStreamingDataInterface.registerListener(this);
                        } catch (Exception e) {
                            throw new ProcessException("Error while connecting to input signal " + inputLinkConfig.destination + " of process " + MsgUtils.moduleString(this), e);
                        }
                    } catch (Exception e2) {
                        throw new ProcessException("Error while connecting to output signal " + inputLinkConfig.source + " of " + MsgUtils.moduleString(iDataProducerModule), e2);
                    }
                }
            } else if (dataSourceConfig instanceof StorageDataSourceConfig) {
            }
        }
    }

    @Override // org.sensorhub.api.module.IModule
    public void stop() {
        if (this.streamSources != null) {
            for (Map.Entry<IStreamingDataInterface, AbstractStreamProcess<ConfigType>.InputData> entry : this.streamSources.entrySet()) {
                entry.getKey().unregisterListener(this);
                Iterator it = ((InputData) entry.getValue()).dataQueues.iterator();
                while (it.hasNext()) {
                    ((DataQueue) it.next()).clear();
                }
            }
        }
    }

    @Override // org.sensorhub.api.module.IModule
    public void cleanup() {
    }

    @Override // org.sensorhub.api.processing.IStreamProcessModule
    public void pause() {
        this.paused = true;
    }

    @Override // org.sensorhub.api.processing.IStreamProcessModule
    public void resume() {
        this.paused = false;
    }

    @Override // org.sensorhub.api.common.IEventListener
    public void handleEvent(Event<?> event) {
        AbstractStreamProcess<ConfigType>.InputData inputData;
        if (this.paused || !(event instanceof DataEvent) || (inputData = this.streamSources.get(event.getSource())) == null) {
            return;
        }
        for (DataBlock dataBlock : ((DataEvent) event).getRecords()) {
            ((InputData) inputData).srcDataBuffer.setData(dataBlock);
            for (DataQueue dataQueue : ((InputData) inputData).dataQueues) {
                dataQueue.add(dataQueue.getSourceComponent().getData());
            }
        }
        try {
            process((DataEvent) event);
        } catch (ProcessException e) {
            log.error("Error while processing data event with time stamp {}", Long.valueOf(event.getTimeStamp()), e);
            this.errorCount++;
            if (this.errorCount > MAX_ERRORS) {
                log.error("Too many errors, stopping processing {}", getName());
                stop();
            }
        }
    }
}
