Source code for sql_queries

import sqlalchemy as sqla

from sql_model import OmnetppTableModel as TM

from common.logging_facilities import logi

r"""
Query the `runAttr` table and return all the rows contained in it.
The equivalent SQL query:

.. code-block:: sql

 SELECT attrName, attrValue FROM runAttr;

"""
run_attr_query = sqla.select(TM.runAttr_table.c.rowId, TM.runAttr_table.c.attrName, TM.runAttr_table.c.attrValue)

r"""
Query the `runParam` table and return all the rows contained in it.
The equivalent SQL query:

.. code-block:: sql

 SELECT paramKey, paramValue FROM runParam;

"""
run_param_query = sqla.select(TM.runParam_table.c.rowId, TM.runParam_table.c.paramKey, TM.runParam_table.c.paramValue)

r"""
Query the `runConfig` table and return all the rows contained in it. For OMNeT++ versions >= 6.
The equivalent SQL query:

.. code-block:: sql

 SELECT configKey, configValue FROM runConfig;

"""
run_config_query = sqla.select(TM.runConfig_table.c.rowId, TM.runConfig_table.c.configKey, TM.runConfig_table.c.configValue)

r"""
Query the `vector` table and return all the `vectorName`s contained in it.
The equivalent SQL query:

.. code-block:: sql

 SELECT vectorName FROM vector_table;

"""
signal_names_query = sqla.select(TM.vector_table.c.vectorName)

r"""
Query the `scalar` table and return all the rows contained in it.
The equivalent SQL query:

.. code-block:: sql

 SELECT * FROM scalar;

"""
scalar_table_query = sqla.select(  TM.scalar_table.c.scalarId
                                 , TM.scalar_table.c.runId
                                 , TM.scalar_table.c.moduleName
                                 , TM.scalar_table.c.scalarName
                                 , TM.scalar_table.c.scalarValue
                                    )



r"""
Query the `scalar` table and return all the rows contained in it.
The equivalent SQL query:

.. code-block:: sql

 SELECT * FROM statistic;

"""
statistic_table_query = sqla.select(  TM.statistic_table.c.statId
                                    , TM.statistic_table.c.runId
                                    , TM.statistic_table.c.moduleName
                                    , TM.statistic_table.c.statName
                                    , TM.statistic_table.c.isHistogram
                                    , TM.statistic_table.c.isWeighted
                                    , TM.statistic_table.c.statCount
                                    , TM.statistic_table.c.statMean
                                    , TM.statistic_table.c.statStddev
                                    , TM.statistic_table.c.statSum
                                    , TM.statistic_table.c.statSqrsum
                                    , TM.statistic_table.c.statMin
                                    , TM.statistic_table.c.statMax
                                    , TM.statistic_table.c.statWeights
                                    , TM.statistic_table.c.statWeightedSum
                                    , TM.statistic_table.c.statSqrSumWeights
                                    , TM.statistic_table.c.statWeightedSqrSum
                                    )


[docs] def generate_statistic_data_query(where_clause:sqla.sql.elements.ColumnElement , statId:bool=True , runId:bool=True , moduleName:bool=True , statName:bool=True , isHistogram:bool=True , isWeighted:bool=True , statCount:bool=True , statMean:bool=True , statStddev:bool=True , statSum:bool=True , statSqrsum:bool=True , statMin:bool=True , statMax:bool=True , statWeights:bool=True , statWeightedSum:bool=True , statSqrSumWeights:bool=True , statWeightedSqrSum:bool=True ): columns = [] if statId: columns.append(TM.statistic_table.c.statId) if runId: columns.append(TM.statistic_table.c.runId) if moduleName: columns.append(TM.statistic_table.c.moduleName) if statName: columns.append(TM.statistic_table.c.statName) if isHistogram: columns.append(TM.statistic_table.c.isHistogram) if isWeighted: columns.append(TM.statistic_table.c.isWeighted) if statCount: columns.append(TM.statistic_table.c.statCount) if statMean: columns.append(TM.statistic_table.c.statMean) if statStddev: columns.append(TM.statistic_table.c.statStddev) if statSum: columns.append(TM.statistic_table.c.statSum) if statSqrsum: columns.append(TM.statistic_table.c.statSqrsum) if statMin: columns.append(TM.statistic_table.c.statMin) if statMax: columns.append(TM.statistic_table.c.statMax) if statWeights: columns.append(TM.statistic_table.c.statWeights) if statWeightedSum: columns.append(TM.statistic_table.c.statWeightedSum) if statSqrSumWeights: columns.append(TM.statistic_table.c.statSqrSumWeights) if statWeightedSqrSum: columns.append(TM.statistic_table.c.statWeightedSqrSum) query = sqla.select(*columns)\ .where( where_clause ) return query
[docs] def generate_statistic_query(statistic_name:str , statId:bool=True , runId:bool=True , moduleName:bool=True , statName:bool=True , isHistogram:bool=True , isWeighted:bool=True , statCount:bool=True , statMean:bool=True , statStddev:bool=True , statSum:bool=True , statSqrsum:bool=True , statMin:bool=True , statMax:bool=True , statWeights:bool=True , statWeightedSum:bool=True , statSqrSumWeights:bool=True , statWeightedSqrSum:bool=True ): return generate_statistic_data_query(TM.statistic_table.c.statName == statistic_name , statId=statId , runId=runId , moduleName=moduleName , statName=statName , isHistogram=isHistogram , isWeighted=isWeighted , statCount=statCount , statMean=statMean , statStddev=statStddev , statSum=statSum , statSqrsum=statSqrsum , statMin=statMin , statMax=statMax , statWeights=statWeights , statWeightedSum=statWeightedSum , statSqrSumWeights=statSqrSumWeights , statWeightedSqrSum=statWeightedSqrSum )
[docs] def generate_scalar_data_query(where_clause:sqla.sql.elements.ColumnElement , value_label:str='scalarValue' , runId:bool=True , moduleName:bool=True , scalarName:bool=False , scalarId:bool=False ): columns = [] if runId: columns.append(TM.scalar_table.c.runId) if moduleName: columns.append(TM.scalar_table.c.moduleName) if scalarName: columns.append(TM.scalar_table.c.scalarName) if scalarId: columns.append(TM.scalar_table.c.scalarId) query = sqla.select(*columns , TM.scalar_table.c.scalarValue.label(value_label) ) \ .where( where_clause ) return query
[docs] def generate_scalar_query(scalar_name:str, value_label:str='scalarValue' , runId:bool=True , moduleName:bool=True , scalarName:bool=False , scalarId:bool=False ): return generate_scalar_data_query(TM.scalar_table.c.scalarName == scalar_name , value_label=value_label , runId=runId , moduleName=moduleName , scalarName=scalarName, scalarId=scalarId)
[docs] def generate_scalar_like_query(scalar_name:str, value_label:str='scalarValue' , runId:bool=True , moduleName:bool=True , scalarName:bool=False , scalarId:bool=False ): return generate_scalar_data_query(TM.scalar_table.c.scalarName.like(scalar_name) , value_label=value_label , runId=runId , moduleName=moduleName , scalarName=scalarName, scalarId=scalarId)
[docs] def generate_signal_query(signal_name:str, value_label:str='value' , moduleName:bool=True , simtimeRaw:bool=True , eventNumber:bool=False ): return generate_data_query(TM.vector_table.c.vectorName == signal_name, value_label=value_label , moduleName=moduleName, simtimeRaw=simtimeRaw, eventNumber=eventNumber)
[docs] def generate_signal_like_query(signal_name_pattern:str, value_label:str='value' , vectorName:bool=False , moduleName:bool=True , simtimeRaw:bool=True , eventNumber:bool=False ): return generate_data_query(TM.vector_table.c.vectorName.like(signal_name_pattern), value_label=value_label , vectorName=vectorName, moduleName=moduleName, simtimeRaw=simtimeRaw, eventNumber=eventNumber)
[docs] def generate_signal_for_module_query(signal_name:str, module_name:str, value_label='value' , moduleName:bool=True , simtimeRaw:bool=True , eventNumber:bool=False ): r""" Extract the data for the signal given by `signal_name` for the given `moduleName` only """ return generate_data_query( sqla.and_(TM.vector_table.c.vectorName == signal_name , TM.vector_table.c.moduleName.like(module_name) ) , value_label=value_label , moduleName=moduleName, simtimeRaw=simtimeRaw, eventNumber=eventNumber )
[docs] def generate_data_query(where_clause:sqla.sql.elements.ColumnElement , value_label:str='value' , vectorName:bool=False , moduleName:bool=True , simtimeRaw:bool=True , eventNumber:bool=False ): columns = [] if vectorName: columns.append(TM.vector_table.c.vectorName) if moduleName: columns.append(TM.vector_table.c.moduleName) if simtimeRaw: columns.append(TM.vectorData_table.c.simtimeRaw) if eventNumber: columns.append(TM.vectorData_table.c.eventNumber) query = sqla.select( TM.vectorData_table.c.rowId , *columns , TM.vectorData_table.c.value.label(value_label) ) \ .join( TM.vectorData_table , TM.vector_table.c.vectorId == TM.vectorData_table.c.vectorId ) \ .where( where_clause ) return query
[docs] def get_signal_with_position(x_signal:str, y_signal:str , value_label_px:str, value_label_py:str , signal_name:str, value_label:str , restriction:tuple=None , moduleName:bool=True , simtimeRaw:bool=True , eventNumber:bool=False ): r""" Get all the signal data for the signal with the name `signal_name` within the rectangle described by the tuple given in `restriction`. The equivalent SQL query: .. code-block:: sql WITH pxvids AS (SELECT vectorId, moduleName FROM vector AS v WHERE vectorName == '<positionX>'), pyvids AS (SELECT vectorId, moduleName FROM vector AS v WHERE vectorName == '<positionY>'), pys AS (SELECT moduleName, eventNumber, simtimeRaw, value AS posY FROM pyvids JOIN vectorData AS vd ON vd.vectorId == pyvids.vectorId WHERE vd.value < <y_max> AND vd.value > <y_min> ), pxs AS (SELECT moduleName, eventNumber, simtimeRaw, value AS posX FROM pxvids JOIN vectorData AS vd ON vd.vectorId == pxvids.vectorId WHERE vd.value < <x_max> AND vd.value > <x_min> ), pos AS (SELECT pxs.moduleName, pxs.eventNumber, pxs.simtimeRaw, posX, posY FROM pxs JOIN pys ON pxs.eventNumber == pys.eventNumber ), val AS (SELECT vd.vectorId, vd.eventNumber, vd.value FROM vector AS v JOIN vectorData AS vd ON v.vectorId == vd.vectorId WHERE v.vectorName == '<signal_name>' ) SELECT p.moduleName, p.eventNumber, p.simtimeRaw, posX, posY, v.value FROM pos AS p JOIN val AS v ON p.vectorId == v.vectorId AND p.eventNumber == v.eventNumber ; Parameters ---------- x_signal : str The name of the signal containing the x-coordinate data y_signal : str The name of the signal containing the y-coordinate data value_label_px : str The name for the x-coordinate in the output value_label_py : str The name for the y-coordinate in the output signal_name : str The name of the signal to extract value_label : str The name for the signal in the output restriction : tuple The selection rectangle, defined as (x_min, y_min, x_max, y_max) moduleName : bool Whether to include the `moduleName` in the output simtimeRaw : bool Whether to include the `simtimeRaw` in the output eventNumber : bool Whether to include the `eventNumber` in the output """ # get the vectorIds for the x & y position signals pxidsq = sqla.select(TM.vector_table.c.vectorId, TM.vector_table.c.moduleName) \ .where(TM.vector_table.c.vectorName == x_signal) pyidsq = sqla.select(TM.vector_table.c.vectorId, TM.vector_table.c.moduleName) \ .where(TM.vector_table.c.vectorName == y_signal) # get the data for the x & y position vectorIds, each within a given interval pxs = sqla.select(pxidsq.c.moduleName , TM.vectorData_table.c.eventNumber , TM.vectorData_table.c.simtimeRaw , TM.vectorData_table.c.value ).join(TM.vectorData_table , TM.vectorData_table.c.vectorId == pxidsq.c.vectorId) pys = sqla.select(pyidsq.c.moduleName , TM.vectorData_table.c.eventNumber , TM.vectorData_table.c.simtimeRaw , TM.vectorData_table.c.value ).join(TM.vectorData_table , TM.vectorData_table.c.vectorId == pyidsq.c.vectorId) # apply geographic restriction: check if the position is within a rectangular area if restriction: logi(f'applying geographic restriction with parameters: {restriction=}') x_min, y_min, x_max, y_max = restriction pxs = pxs.where(TM.vectorData_table.c.value <= x_max).where(TM.vectorData_table.c.value >= x_min) pys = pys.where(TM.vectorData_table.c.value <= y_max).where(TM.vectorData_table.c.value >= y_min) # join x & y positions over the eventNumber position_join = sqla.join(pxs, pys, pxs.c.eventNumber == pys.c.eventNumber) # select the proper columns positions = sqla.select(pxs.c.moduleName, pxs.c.eventNumber, pxs.c.simtimeRaw , pxs.c.value.label('px'), pys.c.value.label('py') ).select_from(position_join) # get the actual signal values vals = sqla.select(TM.vector_table.c.vectorId, TM.vectorData_table.c.eventNumber, TM.vectorData_table.c.value) \ .where(TM.vector_table.c.vectorName == signal_name \ ).join( TM.vectorData_table , TM.vectorData_table.c.vectorId == TM.vector_table.c.vectorId ) columns = [] if moduleName: columns.append(positions.c.moduleName) if simtimeRaw: columns.append(positions.c.simtimeRaw) if eventNumber: columns.append(positions.c.eventNumber) # join positions with signal values position_vals_join = sqla.join(positions, vals, positions.c.eventNumber == vals.c.eventNumber) query = sqla.select(*columns , positions.c.px.label(value_label_px), positions.c.py.label(value_label_py) , vals.c.value.label(value_label) ).select_from(position_vals_join) return query