Skip to content

API Reference

This page provides the technical documentation for the functions and classes within the Turban Toolbox.


Instruments

Generic instrument API (instruments/generic/api.py)

Dropsonde

Bases: Instrument

Source code in turban/instruments/generic/api.py
11
12
13
14
15
16
17
class Dropsonde(Instrument):
    @abstractmethod
    def to_shear_level1(self) -> "ShearLevel1":
        """
        Convert raw data to shear level 1
        """
        raise NotImplementedError("to_shear_level1 must be implemented in subclasses")

to_shear_level1() abstractmethod

Convert raw data to shear level 1

Source code in turban/instruments/generic/api.py
12
13
14
15
16
17
@abstractmethod
def to_shear_level1(self) -> "ShearLevel1":
    """
    Convert raw data to shear level 1
    """
    raise NotImplementedError("to_shear_level1 must be implemented in subclasses")

Generic instrument configuration (instruments/generic/config.py)

MSS Configuration (instruments/mss/config.py)

MssDeviceConfig

Bases: BaseModel

Source code in turban/instruments/mss/config.py
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
class MssDeviceConfig(BaseModel):
    offset: int = Field(
        default=0, description="16bit offset, typically 0, older devices have -32768"
    )
    sampling_freq: float = Field(
        default=1024.0,
        description="The sampling frequency [Hz] of the microstructure probe",
    )
    gain_utemp: float = Field(
        default=1.5,
        description="Gain of the NTC highpass pre-emphasis differentiator",
    )
    sensors: dict[
        str,
        Annotated[
            Union[
                MssSensor,
                MssSensorPoly,
                MssShearSensor,
                MssSensorPressure,
                MssSensorNTC,
                MssSensorTurb,
                MssSensorOptode,
                MssSensorOptodeInternalTemp,
            ],
            Field(discriminator="calibration_type"),
        ],
    ] = Field(
        default={}, description="A dictionary of the sensors mounted to the probe"
    )
    sensornames_ctd: dict[
        Union[Literal["cond"], Literal["temp"], Literal["press"]],
        str,
    ] = Field(
        default={"cond": "", "temp": "", "press": ""},
        description="A dictionary to link standard ctd names to the names of the MSS config",
    )
    pressure_sensorname: Optional[str] = Field(
        default=None,
        description="The sensorname of the pressure sensor, if None a best guess will be made",
    )
    pspd_rel_method: Literal["pressure", "constant", "external"] = Field(
        default="pressure",
        description="Method for the platform speed relative to the seawater, this is needed to calculate wavenumbers from the sampled data",
    )
    pspd_rel_constant_vel: Optional[float] = Field(
        default=None,
        description='Constant velocity [m/s] used as pspd_rel, if defined by "pspd_rel_method"',
    )

    @classmethod
    def from_mrd(
        cls,
        filename: str | Path,
        shear_sensitivities: dict[str, float],
        offset: int = 0,
    ):
        """Create a MssDeviceConfig from an MRD file.

        Parameters
        ----------
        filename : str or Path
            Path to the MRD file.
        shear_sensitivities : dict[str, float]
            Mapping of sensor name to sensitivity value for shear sensors.
        offset : int, optional
            16-bit count offset, typically 0 or -32768 for older devices
            (default 0).

        Returns
        -------
        MssDeviceConfig
            Configuration object with sensors parsed from the MRD file header.
        """
        self = cls()
        logger.debug("Opening file:{}".format(filename))
        mrd_file = open(filename, "rb")
        data = mss_mrd.read_mrd(filestream=mrd_file, header_only=True)
        logger.debug("Closing file:{}".format(filename))
        mrd_file.close()
        header_raw = data["header"]
        header = mss_mrd.parse_header(header_raw)
        # Check for CTD sensors and link names
        for ctd_sensor in self.sensornames_ctd.keys():
            sensorname_mss = self.sensornames_ctd[ctd_sensor]
            if len(sensorname_mss) == 0:
                logger.debug("Searching for sensor of {}".format(ctd_sensor))
                sensornames = [
                    header["mss"]["channels"][ch]["name"]
                    for ch in header["mss"]["channels"]
                ]
                sensornames_casefold = [
                    header["mss"]["channels"][ch]["name"].casefold()
                    for ch in header["mss"]["channels"]
                ]
                for k in mss_standard_ctd_sensornames[
                    ctd_sensor
                ]:  # Loop over the stanndard names
                    if k.casefold() in sensornames_casefold:
                        index_sensor = sensornames_casefold.index(k.casefold())
                        sensorname_mss = sensornames[index_sensor]
                        logger.debug(
                            "\tFound MSS sensor {} for {}".format(
                                sensorname_mss, ctd_sensor
                            )
                        )
                        self.sensornames_ctd[ctd_sensor] = sensorname_mss
                        break

        # Fill in sensors from header
        for ch in header["mss"]["channels"]:
            sensor_dict = header["mss"]["channels"][ch]
            sensorname = sensor_dict["name"]
            unit = sensor_dict["unit"]
            caltype = sensor_dict["caltype"].upper()
            logger.debug(
                "Checking Channel:{}, sensorname:{}, caltype:{}".format(
                    ch, sensorname, caltype
                )
            )
            if caltype == "N":  # Polynom
                if sensorname.upper().startswith("SHE"):
                    logger.debug("\tAdding shear sensor {}".format(sensorname))
                    sensitivity = shear_sensitivities[sensorname]
                    self.sensors[sensorname] = MssShearSensor(
                        channel=ch,
                        name=sensorname,
                        coefficients=sensor_dict["coeff"],
                        unit=unit,
                        sensitivity=sensitivity,
                    )
                else:
                    logger.debug(
                        "\tAdding standard polynomial sensor {}".format(sensorname)
                    )
                    self.sensors[sensorname] = MssSensorPoly(
                        channel=ch,
                        name=sensorname,
                        coefficients=sensor_dict["coeff"],
                        unit=unit,
                    )
            elif caltype == "SHH":
                logger.debug("\tAdding NTC sensor {}".format(sensorname))
                self.sensors[sensorname] = MssSensorNTC(
                    channel=ch,
                    name=sensorname,
                    coefficients=sensor_dict["coeff"],
                    unit=unit,
                )
            elif caltype == "P":  # Pressure
                logger.debug("\tAdding pressure sensor {}".format(sensorname))
                self.sensors[sensorname] = MssSensorPressure(
                    channel=ch,
                    name=sensorname,
                    coefficients=sensor_dict["coeff"],
                    unit=unit,
                )
            elif caltype == "V04":  # Oxygen
                logger.debug("\tAdding oxygen optode sensor {}".format(sensorname))
                self.sensors[sensorname] = MssSensorOptode(
                    channel=ch,
                    name=sensorname,
                    coefficients=sensor_dict["coeff"],
                    unit=unit,
                )
            elif caltype == "N24":  # Internal temperature of oxygensensor
                logger.debug(
                    "\tAdding oxygen optode temperature sensor {}".format(sensorname)
                )
                self.sensors[sensorname] = MssSensorOptodeInternalTemp(
                    channel=ch,
                    name=sensorname,
                    coefficients=sensor_dict["coeff"],
                    unit=unit,
                )
            elif caltype == "NFC":  # Turbidity etc.
                logger.debug("\tAdding NFC sensor {}".format(sensorname))
                self.sensors[sensorname] = MssSensorTurb(
                    channel=ch,
                    name=sensorname,
                    coefficients=sensor_dict["coeff"],
                    unit=unit,
                )
        # print('Header', header['channels'])

        return self

    @classmethod
    def from_prb(cls, filename, shear_sensitivities=[None, None], offset=0):
        """Create a MssDeviceConfig from a PRB file.

        Parameters
        ----------
        filename : str or Path
            Path to the PRB file.
        shear_sensitivities : list, optional
            Shear sensitivity values (default [None, None]).
        offset : int, optional
            16-bit count offset (default 0).

        Raises
        ------
        NotImplementedError
            This method is not yet implemented.
        """
        raise NotImplementedError

from_mrd(filename, shear_sensitivities, offset=0) classmethod

Create a MssDeviceConfig from an MRD file.

Parameters

filename : str or Path Path to the MRD file. shear_sensitivities : dict[str, float] Mapping of sensor name to sensitivity value for shear sensors. offset : int, optional 16-bit count offset, typically 0 or -32768 for older devices (default 0).

Returns

MssDeviceConfig Configuration object with sensors parsed from the MRD file header.

Source code in turban/instruments/mss/config.py
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
@classmethod
def from_mrd(
    cls,
    filename: str | Path,
    shear_sensitivities: dict[str, float],
    offset: int = 0,
):
    """Create a MssDeviceConfig from an MRD file.

    Parameters
    ----------
    filename : str or Path
        Path to the MRD file.
    shear_sensitivities : dict[str, float]
        Mapping of sensor name to sensitivity value for shear sensors.
    offset : int, optional
        16-bit count offset, typically 0 or -32768 for older devices
        (default 0).

    Returns
    -------
    MssDeviceConfig
        Configuration object with sensors parsed from the MRD file header.
    """
    self = cls()
    logger.debug("Opening file:{}".format(filename))
    mrd_file = open(filename, "rb")
    data = mss_mrd.read_mrd(filestream=mrd_file, header_only=True)
    logger.debug("Closing file:{}".format(filename))
    mrd_file.close()
    header_raw = data["header"]
    header = mss_mrd.parse_header(header_raw)
    # Check for CTD sensors and link names
    for ctd_sensor in self.sensornames_ctd.keys():
        sensorname_mss = self.sensornames_ctd[ctd_sensor]
        if len(sensorname_mss) == 0:
            logger.debug("Searching for sensor of {}".format(ctd_sensor))
            sensornames = [
                header["mss"]["channels"][ch]["name"]
                for ch in header["mss"]["channels"]
            ]
            sensornames_casefold = [
                header["mss"]["channels"][ch]["name"].casefold()
                for ch in header["mss"]["channels"]
            ]
            for k in mss_standard_ctd_sensornames[
                ctd_sensor
            ]:  # Loop over the stanndard names
                if k.casefold() in sensornames_casefold:
                    index_sensor = sensornames_casefold.index(k.casefold())
                    sensorname_mss = sensornames[index_sensor]
                    logger.debug(
                        "\tFound MSS sensor {} for {}".format(
                            sensorname_mss, ctd_sensor
                        )
                    )
                    self.sensornames_ctd[ctd_sensor] = sensorname_mss
                    break

    # Fill in sensors from header
    for ch in header["mss"]["channels"]:
        sensor_dict = header["mss"]["channels"][ch]
        sensorname = sensor_dict["name"]
        unit = sensor_dict["unit"]
        caltype = sensor_dict["caltype"].upper()
        logger.debug(
            "Checking Channel:{}, sensorname:{}, caltype:{}".format(
                ch, sensorname, caltype
            )
        )
        if caltype == "N":  # Polynom
            if sensorname.upper().startswith("SHE"):
                logger.debug("\tAdding shear sensor {}".format(sensorname))
                sensitivity = shear_sensitivities[sensorname]
                self.sensors[sensorname] = MssShearSensor(
                    channel=ch,
                    name=sensorname,
                    coefficients=sensor_dict["coeff"],
                    unit=unit,
                    sensitivity=sensitivity,
                )
            else:
                logger.debug(
                    "\tAdding standard polynomial sensor {}".format(sensorname)
                )
                self.sensors[sensorname] = MssSensorPoly(
                    channel=ch,
                    name=sensorname,
                    coefficients=sensor_dict["coeff"],
                    unit=unit,
                )
        elif caltype == "SHH":
            logger.debug("\tAdding NTC sensor {}".format(sensorname))
            self.sensors[sensorname] = MssSensorNTC(
                channel=ch,
                name=sensorname,
                coefficients=sensor_dict["coeff"],
                unit=unit,
            )
        elif caltype == "P":  # Pressure
            logger.debug("\tAdding pressure sensor {}".format(sensorname))
            self.sensors[sensorname] = MssSensorPressure(
                channel=ch,
                name=sensorname,
                coefficients=sensor_dict["coeff"],
                unit=unit,
            )
        elif caltype == "V04":  # Oxygen
            logger.debug("\tAdding oxygen optode sensor {}".format(sensorname))
            self.sensors[sensorname] = MssSensorOptode(
                channel=ch,
                name=sensorname,
                coefficients=sensor_dict["coeff"],
                unit=unit,
            )
        elif caltype == "N24":  # Internal temperature of oxygensensor
            logger.debug(
                "\tAdding oxygen optode temperature sensor {}".format(sensorname)
            )
            self.sensors[sensorname] = MssSensorOptodeInternalTemp(
                channel=ch,
                name=sensorname,
                coefficients=sensor_dict["coeff"],
                unit=unit,
            )
        elif caltype == "NFC":  # Turbidity etc.
            logger.debug("\tAdding NFC sensor {}".format(sensorname))
            self.sensors[sensorname] = MssSensorTurb(
                channel=ch,
                name=sensorname,
                coefficients=sensor_dict["coeff"],
                unit=unit,
            )
    # print('Header', header['channels'])

    return self

from_prb(filename, shear_sensitivities=[None, None], offset=0) classmethod

Create a MssDeviceConfig from a PRB file.

Parameters

filename : str or Path Path to the PRB file. shear_sensitivities : list, optional Shear sensitivity values (default [None, None]). offset : int, optional 16-bit count offset (default 0).

Raises

NotImplementedError This method is not yet implemented.

Source code in turban/instruments/mss/config.py
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
@classmethod
def from_prb(cls, filename, shear_sensitivities=[None, None], offset=0):
    """Create a MssDeviceConfig from a PRB file.

    Parameters
    ----------
    filename : str or Path
        Path to the PRB file.
    shear_sensitivities : list, optional
        Shear sensitivity values (default [None, None]).
    offset : int, optional
        16-bit count offset (default 0).

    Raises
    ------
    NotImplementedError
        This method is not yet implemented.
    """
    raise NotImplementedError

MssSensorNTC

Bases: MssSensor

Steinhart/Hart NTC Polynomial

Source code in turban/instruments/mss/config.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
class MssSensorNTC(MssSensor):
    """
    Steinhart/Hart NTC Polynomial
    """

    calibration_type: Literal["SHH"] = Field(default="SHH")

    def __init__(self, *args, **kwargs):
        """Initialize and build Steinhart-Hart calibration polynomial.

        Parameters
        ----------
        *args
            Positional arguments passed to pydantic BaseModel.
        **kwargs
            Keyword arguments passed to pydantic BaseModel. Expected keys include
            `name`, `coefficients`, `channel`, and optional `unit`.
            The polynomial is built from all but the last coefficient.
        """
        super().__init__(*args, **kwargs)
        self._p = np.polynomial.Polynomial(self.coefficients[:-1])

    def raw_to_units(self, rawdata, offset):
        """Apply Steinhart-Hart NTC calibration to raw data.

        Parameters
        ----------
        rawdata : array_like
            Raw resistance counts.
        offset : float
            Additive offset applied before taking the natural log.

        Returns
        -------
        ndarray
            Temperature in degrees Celsius.
        """
        data = self._p(np.log(rawdata + offset))
        data = 1 / data - 273.15  # Kelvin to degC
        return data

__init__(*args, **kwargs)

Initialize and build Steinhart-Hart calibration polynomial.

Parameters

args Positional arguments passed to pydantic BaseModel. *kwargs Keyword arguments passed to pydantic BaseModel. Expected keys include name, coefficients, channel, and optional unit. The polynomial is built from all but the last coefficient.

Source code in turban/instruments/mss/config.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
def __init__(self, *args, **kwargs):
    """Initialize and build Steinhart-Hart calibration polynomial.

    Parameters
    ----------
    *args
        Positional arguments passed to pydantic BaseModel.
    **kwargs
        Keyword arguments passed to pydantic BaseModel. Expected keys include
        `name`, `coefficients`, `channel`, and optional `unit`.
        The polynomial is built from all but the last coefficient.
    """
    super().__init__(*args, **kwargs)
    self._p = np.polynomial.Polynomial(self.coefficients[:-1])

raw_to_units(rawdata, offset)

Apply Steinhart-Hart NTC calibration to raw data.

Parameters

rawdata : array_like Raw resistance counts. offset : float Additive offset applied before taking the natural log.

Returns

ndarray Temperature in degrees Celsius.

Source code in turban/instruments/mss/config.py
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
def raw_to_units(self, rawdata, offset):
    """Apply Steinhart-Hart NTC calibration to raw data.

    Parameters
    ----------
    rawdata : array_like
        Raw resistance counts.
    offset : float
        Additive offset applied before taking the natural log.

    Returns
    -------
    ndarray
        Temperature in degrees Celsius.
    """
    data = self._p(np.log(rawdata + offset))
    data = 1 / data - 273.15  # Kelvin to degC
    return data

MssSensorOptode

Bases: MssSensor

Convert oxygen optode rawdata to physical units

Source code in turban/instruments/mss/config.py
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
class MssSensorOptode(MssSensor):
    """
    Convert oxygen optode rawdata to physical units
    """

    calibration_type: Literal["V04"] = Field(default="V04")

    def __init__(self, *args, **kwargs):
        """Initialize and build two-stage optode calibration polynomials.

        Parameters
        ----------
        *args
            Positional arguments passed to pydantic BaseModel.
        **kwargs
            Keyword arguments passed to pydantic BaseModel. Expected keys include
            `name`, `coefficients`, `channel`, and optional `unit`.
            Two polynomials are built: `_p1` from coefficients[0:2] (raw-to-mV)
            and `_p2` from coefficients[-2:] (mV-to-O2, zero-point correction).
        """
        super().__init__(*args, **kwargs)
        self._p1 = np.polynomial.Polynomial(
            self.coefficients[0:2]
        )  # Convert data to mV
        self._p2 = np.polynomial.Polynomial(
            self.coefficients[-2:]
        )  # 0 Point correction with B0 and B1

    def raw_to_units(self, rawdata, offset):
        """Apply two-stage oxygen optode calibration to raw data.

        Parameters
        ----------
        rawdata : array_like
            Raw sensor counts.
        offset : float
            Additive offset applied in the first conversion stage.

        Returns
        -------
        ndarray
            Calibrated oxygen concentration in physical units.
        """
        data_mV = self._p1(rawdata + offset)
        data = self._p2(data_mV)
        return data

__init__(*args, **kwargs)

Initialize and build two-stage optode calibration polynomials.

Parameters

args Positional arguments passed to pydantic BaseModel. *kwargs Keyword arguments passed to pydantic BaseModel. Expected keys include name, coefficients, channel, and optional unit. Two polynomials are built: _p1 from coefficients[0:2] (raw-to-mV) and _p2 from coefficients[-2:] (mV-to-O2, zero-point correction).

Source code in turban/instruments/mss/config.py
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
def __init__(self, *args, **kwargs):
    """Initialize and build two-stage optode calibration polynomials.

    Parameters
    ----------
    *args
        Positional arguments passed to pydantic BaseModel.
    **kwargs
        Keyword arguments passed to pydantic BaseModel. Expected keys include
        `name`, `coefficients`, `channel`, and optional `unit`.
        Two polynomials are built: `_p1` from coefficients[0:2] (raw-to-mV)
        and `_p2` from coefficients[-2:] (mV-to-O2, zero-point correction).
    """
    super().__init__(*args, **kwargs)
    self._p1 = np.polynomial.Polynomial(
        self.coefficients[0:2]
    )  # Convert data to mV
    self._p2 = np.polynomial.Polynomial(
        self.coefficients[-2:]
    )  # 0 Point correction with B0 and B1

raw_to_units(rawdata, offset)

Apply two-stage oxygen optode calibration to raw data.

Parameters

rawdata : array_like Raw sensor counts. offset : float Additive offset applied in the first conversion stage.

Returns

ndarray Calibrated oxygen concentration in physical units.

Source code in turban/instruments/mss/config.py
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
def raw_to_units(self, rawdata, offset):
    """Apply two-stage oxygen optode calibration to raw data.

    Parameters
    ----------
    rawdata : array_like
        Raw sensor counts.
    offset : float
        Additive offset applied in the first conversion stage.

    Returns
    -------
    ndarray
        Calibrated oxygen concentration in physical units.
    """
    data_mV = self._p1(rawdata + offset)
    data = self._p2(data_mV)
    return data

MssSensorOptodeInternalTemp

Bases: MssSensor

Source code in turban/instruments/mss/config.py
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
class MssSensorOptodeInternalTemp(MssSensor):
    calibration_type: Literal["N24"] = Field(default="N24")

    def __init__(self, *args, **kwargs):
        """Initialize and build calibration polynomial for optode internal temperature.

        Parameters
        ----------
        *args
            Positional arguments passed to pydantic BaseModel.
        **kwargs
            Keyword arguments passed to pydantic BaseModel. Expected keys include
            `name`, `coefficients`, `channel`, and optional `unit`.
        """
        super().__init__(*args, **kwargs)
        self._p = np.polynomial.Polynomial(self.coefficients)

    def raw_to_units(self, rawdata, offset=0):
        """Apply polynomial calibration to raw optode internal temperature data.

        Parameters
        ----------
        rawdata : array_like
            Raw sensor counts.
        offset : float, optional
            Additive offset applied before evaluating the polynomial.

        Returns
        -------
        ndarray
            Calibrated temperature in physical units.
        """
        data = self._p(rawdata + offset)
        return data

__init__(*args, **kwargs)

Initialize and build calibration polynomial for optode internal temperature.

Parameters

args Positional arguments passed to pydantic BaseModel. *kwargs Keyword arguments passed to pydantic BaseModel. Expected keys include name, coefficients, channel, and optional unit.

Source code in turban/instruments/mss/config.py
284
285
286
287
288
289
290
291
292
293
294
295
296
def __init__(self, *args, **kwargs):
    """Initialize and build calibration polynomial for optode internal temperature.

    Parameters
    ----------
    *args
        Positional arguments passed to pydantic BaseModel.
    **kwargs
        Keyword arguments passed to pydantic BaseModel. Expected keys include
        `name`, `coefficients`, `channel`, and optional `unit`.
    """
    super().__init__(*args, **kwargs)
    self._p = np.polynomial.Polynomial(self.coefficients)

raw_to_units(rawdata, offset=0)

Apply polynomial calibration to raw optode internal temperature data.

Parameters

rawdata : array_like Raw sensor counts. offset : float, optional Additive offset applied before evaluating the polynomial.

Returns

ndarray Calibrated temperature in physical units.

Source code in turban/instruments/mss/config.py
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
def raw_to_units(self, rawdata, offset=0):
    """Apply polynomial calibration to raw optode internal temperature data.

    Parameters
    ----------
    rawdata : array_like
        Raw sensor counts.
    offset : float, optional
        Additive offset applied before evaluating the polynomial.

    Returns
    -------
    ndarray
        Calibrated temperature in physical units.
    """
    data = self._p(rawdata + offset)
    return data

MssSensorPoly

Bases: MssSensor

Source code in turban/instruments/mss/config.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
class MssSensorPoly(MssSensor):
    calibration_type: Literal["N"] = Field(default="N")

    def __init__(self, *args, **kwargs):
        """Initialize and build the calibration polynomial from coefficients.

        Parameters
        ----------
        *args
            Positional arguments passed to pydantic BaseModel.
        **kwargs
            Keyword arguments passed to pydantic BaseModel. Expected keys include
            `name`, `coefficients`, `channel`, and optional `unit`.
        """
        super().__init__(*args, **kwargs)
        self._p = np.polynomial.Polynomial(self.coefficients)

    def raw_to_units(self, rawdata, offset=0):
        """Apply polynomial calibration to raw data.

        Parameters
        ----------
        rawdata : array_like
            Raw sensor counts.
        offset : float, optional
            Additive offset applied before evaluating the polynomial.

        Returns
        -------
        ndarray
            Calibrated data in physical units.
        """
        data = self._p(rawdata + offset)
        return data

__init__(*args, **kwargs)

Initialize and build the calibration polynomial from coefficients.

Parameters

args Positional arguments passed to pydantic BaseModel. *kwargs Keyword arguments passed to pydantic BaseModel. Expected keys include name, coefficients, channel, and optional unit.

Source code in turban/instruments/mss/config.py
35
36
37
38
39
40
41
42
43
44
45
46
47
def __init__(self, *args, **kwargs):
    """Initialize and build the calibration polynomial from coefficients.

    Parameters
    ----------
    *args
        Positional arguments passed to pydantic BaseModel.
    **kwargs
        Keyword arguments passed to pydantic BaseModel. Expected keys include
        `name`, `coefficients`, `channel`, and optional `unit`.
    """
    super().__init__(*args, **kwargs)
    self._p = np.polynomial.Polynomial(self.coefficients)

raw_to_units(rawdata, offset=0)

Apply polynomial calibration to raw data.

Parameters

rawdata : array_like Raw sensor counts. offset : float, optional Additive offset applied before evaluating the polynomial.

Returns

ndarray Calibrated data in physical units.

Source code in turban/instruments/mss/config.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def raw_to_units(self, rawdata, offset=0):
    """Apply polynomial calibration to raw data.

    Parameters
    ----------
    rawdata : array_like
        Raw sensor counts.
    offset : float, optional
        Additive offset applied before evaluating the polynomial.

    Returns
    -------
    ndarray
        Calibrated data in physical units.
    """
    data = self._p(rawdata + offset)
    return data

MssSensorPressure

Bases: MssSensor

Source code in turban/instruments/mss/config.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
class MssSensorPressure(MssSensor):
    calibration_type: Literal["P"] = Field(default="P")

    def __init__(self, *args, **kwargs):
        """Initialize and build pressure calibration polynomial.

        Parameters
        ----------
        *args
            Positional arguments passed to pydantic BaseModel.
        **kwargs
            Keyword arguments passed to pydantic BaseModel. Expected keys include
            `name`, `coefficients`, `channel`, and optional `unit`.
            The polynomial is built from all but the last coefficient.
        """
        super().__init__(*args, **kwargs)
        self._p = np.polynomial.Polynomial(self.coefficients[:-1])

    def raw_to_units(self, rawdata, offset=0):
        """Apply pressure calibration to raw data.

        Parameters
        ----------
        rawdata : array_like
            Raw sensor counts.
        offset : float, optional
            Additive offset applied before evaluating the polynomial.

        Returns
        -------
        ndarray
            Calibrated pressure; the last coefficient is subtracted as a zero-offset.
        """
        data = self._p(rawdata + offset) - self.coefficients[-1]
        return data

__init__(*args, **kwargs)

Initialize and build pressure calibration polynomial.

Parameters

args Positional arguments passed to pydantic BaseModel. *kwargs Keyword arguments passed to pydantic BaseModel. Expected keys include name, coefficients, channel, and optional unit. The polynomial is built from all but the last coefficient.

Source code in turban/instruments/mss/config.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def __init__(self, *args, **kwargs):
    """Initialize and build pressure calibration polynomial.

    Parameters
    ----------
    *args
        Positional arguments passed to pydantic BaseModel.
    **kwargs
        Keyword arguments passed to pydantic BaseModel. Expected keys include
        `name`, `coefficients`, `channel`, and optional `unit`.
        The polynomial is built from all but the last coefficient.
    """
    super().__init__(*args, **kwargs)
    self._p = np.polynomial.Polynomial(self.coefficients[:-1])

raw_to_units(rawdata, offset=0)

Apply pressure calibration to raw data.

Parameters

rawdata : array_like Raw sensor counts. offset : float, optional Additive offset applied before evaluating the polynomial.

Returns

ndarray Calibrated pressure; the last coefficient is subtracted as a zero-offset.

Source code in turban/instruments/mss/config.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
def raw_to_units(self, rawdata, offset=0):
    """Apply pressure calibration to raw data.

    Parameters
    ----------
    rawdata : array_like
        Raw sensor counts.
    offset : float, optional
        Additive offset applied before evaluating the polynomial.

    Returns
    -------
    ndarray
        Calibrated pressure; the last coefficient is subtracted as a zero-offset.
    """
    data = self._p(rawdata + offset) - self.coefficients[-1]
    return data

MssSensorTurb

Bases: MssSensor

Convert rawdata turbidity to NFC

Source code in turban/instruments/mss/config.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
class MssSensorTurb(MssSensor):
    """
    Convert rawdata turbidity to NFC
    """

    calibration_type: Literal["NFC"] = Field(default="NFC")

    def __init__(self, *args, **kwargs):
        """Initialize and build turbidity calibration polynomial.

        Parameters
        ----------
        *args
            Positional arguments passed to pydantic BaseModel.
        **kwargs
            Keyword arguments passed to pydantic BaseModel. Expected keys include
            `name`, `coefficients`, `channel`, and optional `unit`.
            The polynomial is built from all but the last coefficient.
        """
        super().__init__(*args, **kwargs)
        self._p = np.polynomial.Polynomial(self.coefficients[:-1])

    def raw_to_units(self, rawdata, offset):
        """Apply turbidity (NFC) calibration to raw data.

        Parameters
        ----------
        rawdata : array_like
            Raw sensor counts.
        offset : float
            Additive offset applied before polynomial evaluation.

        Returns
        -------
        ndarray
            Calibrated turbidity in physical units.
        """
        p = np.polynomial.Polynomial(self.coefficients[:-2])
        data = p(rawdata + offset) * self.coefficients[-1] + self.coefficients[-2]
        return data

__init__(*args, **kwargs)

Initialize and build turbidity calibration polynomial.

Parameters

args Positional arguments passed to pydantic BaseModel. *kwargs Keyword arguments passed to pydantic BaseModel. Expected keys include name, coefficients, channel, and optional unit. The polynomial is built from all but the last coefficient.

Source code in turban/instruments/mss/config.py
198
199
200
201
202
203
204
205
206
207
208
209
210
211
def __init__(self, *args, **kwargs):
    """Initialize and build turbidity calibration polynomial.

    Parameters
    ----------
    *args
        Positional arguments passed to pydantic BaseModel.
    **kwargs
        Keyword arguments passed to pydantic BaseModel. Expected keys include
        `name`, `coefficients`, `channel`, and optional `unit`.
        The polynomial is built from all but the last coefficient.
    """
    super().__init__(*args, **kwargs)
    self._p = np.polynomial.Polynomial(self.coefficients[:-1])

raw_to_units(rawdata, offset)

Apply turbidity (NFC) calibration to raw data.

Parameters

rawdata : array_like Raw sensor counts. offset : float Additive offset applied before polynomial evaluation.

Returns

ndarray Calibrated turbidity in physical units.

Source code in turban/instruments/mss/config.py
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
def raw_to_units(self, rawdata, offset):
    """Apply turbidity (NFC) calibration to raw data.

    Parameters
    ----------
    rawdata : array_like
        Raw sensor counts.
    offset : float
        Additive offset applied before polynomial evaluation.

    Returns
    -------
    ndarray
        Calibrated turbidity in physical units.
    """
    p = np.polynomial.Polynomial(self.coefficients[:-2])
    data = p(rawdata + offset) * self.coefficients[-1] + self.coefficients[-2]
    return data

MssShearSensor

Bases: MssSensor

Source code in turban/instruments/mss/config.py
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
class MssShearSensor(MssSensor):
    sensitivity: float
    serial_number: str = Field(default="")
    reference_temperature: float = Field(default=np.nan)
    calibration_date: str = Field(default="")
    calibration_type: Literal["SHE"] = Field(default="SHE")

    def __init__(self, *args, **kwargs):
        """Initialize and compute calibration coefficients from sensitivity.

        Parameters
        ----------
        *args
            Positional arguments passed to pydantic BaseModel.
        **kwargs
            Keyword arguments passed to pydantic BaseModel. Expected keys include
            `name`, `sensitivity`, `channel`, and optional `unit`.
            Coefficients are derived from `sensitivity` and stored in `coefficients`.
        """
        super().__init__(*args, **kwargs)
        self.coefficients = [None, None]
        self.coefficients[0] = 1.47133e-6 / self.sensitivity
        self.coefficients[1] = 2.94266e-6 / self.sensitivity
        self._p = np.polynomial.Polynomial(self.coefficients)

    def raw_to_units(self, rawdata, offset=0):
        """Apply shear sensor calibration to raw data.

        Parameters
        ----------
        rawdata : array_like
            Raw sensor counts.
        offset : float, optional
            Subtractive offset (shear sensors use negative offset convention).

        Returns
        -------
        ndarray
            Calibrated shear in physical units.
        """
        data = self._p(rawdata - offset)  # The shear sensors have the negative offset
        return data

__init__(*args, **kwargs)

Initialize and compute calibration coefficients from sensitivity.

Parameters

args Positional arguments passed to pydantic BaseModel. *kwargs Keyword arguments passed to pydantic BaseModel. Expected keys include name, sensitivity, channel, and optional unit. Coefficients are derived from sensitivity and stored in coefficients.

Source code in turban/instruments/mss/config.py
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
def __init__(self, *args, **kwargs):
    """Initialize and compute calibration coefficients from sensitivity.

    Parameters
    ----------
    *args
        Positional arguments passed to pydantic BaseModel.
    **kwargs
        Keyword arguments passed to pydantic BaseModel. Expected keys include
        `name`, `sensitivity`, `channel`, and optional `unit`.
        Coefficients are derived from `sensitivity` and stored in `coefficients`.
    """
    super().__init__(*args, **kwargs)
    self.coefficients = [None, None]
    self.coefficients[0] = 1.47133e-6 / self.sensitivity
    self.coefficients[1] = 2.94266e-6 / self.sensitivity
    self._p = np.polynomial.Polynomial(self.coefficients)

raw_to_units(rawdata, offset=0)

Apply shear sensor calibration to raw data.

Parameters

rawdata : array_like Raw sensor counts. offset : float, optional Subtractive offset (shear sensors use negative offset convention).

Returns

ndarray Calibrated shear in physical units.

Source code in turban/instruments/mss/config.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
def raw_to_units(self, rawdata, offset=0):
    """Apply shear sensor calibration to raw data.

    Parameters
    ----------
    rawdata : array_like
        Raw sensor counts.
    offset : float, optional
        Subtractive offset (shear sensors use negative offset convention).

    Returns
    -------
    ndarray
        Calibrated shear in physical units.
    """
    data = self._p(rawdata - offset)  # The shear sensors have the negative offset
    return data

MSS MRD file format (instruments/mss/mss_mrd.py)

parse_header(header)

Parse an ASCII MRD file header into a configuration dictionary.

Extracts metadata from the ASCII header of a Microstructure Raw Data (MRD) file, including ship, cruise, date, and channel calibration coefficients.

Parameters

header : str ASCII header text from the MRD file.

Returns

dict Configuration dictionary with keys 'ship', 'cruise', 'date_pc', and 'mss' (which contains a nested dict with channel information).

Source code in turban/instruments/mss/mss_mrd.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
def parse_header(header):
    """Parse an ASCII MRD file header into a configuration dictionary.

    Extracts metadata from the ASCII header of a Microstructure Raw Data (MRD)
    file, including ship, cruise, date, and channel calibration coefficients.

    Parameters
    ----------
    header : str
        ASCII header text from the MRD file.

    Returns
    -------
    dict
        Configuration dictionary with keys 'ship', 'cruise', 'date_pc', and
        'mss' (which contains a nested dict with channel information).
    """
    config = {"mss": {"channels": {}}}
    ind_ship = header.find("Ship    :   ") + len("Ship    :   ")
    ship = header[ind_ship : ind_ship + 8]
    ship = ship.rstrip("_")
    config["ship"] = ship

    ind_cruise = header.find("Cruise:") + len("Cruise:")
    cruise = header[ind_cruise + 1 : ind_cruise + 18]
    cruise = cruise.rstrip("_")
    config["cruise"] = cruise

    # hs = header.split('\\r')
    hs = header.splitlines()
    sensor_str = []

    logger.debug("PC-Time line:{}".format(hs[2]))
    config["date_pc"] = parse_date(hs[2]).isoformat()
    # print('Config date',config['date_pc'])
    # print('HS')
    # print('hs',hs)
    # print('HS done')
    # print('Type',type(header))
    # Loop over all channels
    # for i in range(17,len(hs)-1):
    for i in range(len(hs)):
        # logger.debug('Testing line {:s}'.format(hs[i]))
        hstmp = re.sub(r"\s+", " ", hs[i])  # replace multiple blanks with one
        hsp = hstmp.split(" ")
        # check if we have a sensor line
        # each sensor should have 12 entries
        # print('hsp',hsp,len(hsp))
        if len(hsp) >= 12:
            FLAG_sensor = True
            try:
                devicenum = int(hsp[0].replace(" ", ""))
            except:
                FLAG_sensor = False
            try:
                channelnum = int(hsp[2].replace(" ", ""))
            except:
                FLAG_sensor = False
        else:
            FLAG_sensor = False

        if FLAG_sensor:
            # logger.debug('Found sensor')
            sensor_str.append(hs[i])
            devicename = hsp[1].replace(" ", "")
            if "MSS" in devicename:  # Treat the MSS here
                config["mss"]["name"] = devicename
                config["mss"]["devicenum"] = devicenum
                channelnum = int(hsp[2].replace(" ", ""))
                channel = hsp[4].replace(" ", "")
                unit = hsp[5].replace(" ", "")
                caltype = hsp[3].replace(" ", "")
                config["mss"]["channels"][channelnum] = {}
                config["mss"]["channels"][channelnum]["unit"] = unit
                config["mss"]["channels"][channelnum]["name"] = channel
                config["mss"]["channels"][channelnum]["caltype"] = caltype
                poly = []
                # There are 6 coefficients for each channel
                for val in hsp[6:12]:
                    poly.append(float(val))

                config["mss"]["channels"][channelnum]["coeff"] = numpy.asarray(poly)
                # print(devicename,channelnum,channel)
                # if(hs[i].upper().find('COUNT') >=0):
                #    mss = hs[i].split(' ')[1]

    return config

read_mrd(filestream, header_only=False, pos_time_only=False)

Read a binaray SST MRD (Microstructure Raw Data) file

Parameters

filestream header_only pos_time_only

Source code in turban/instruments/mss/mss_mrd.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
def read_mrd(filestream, header_only=False, pos_time_only=False) -> dict:
    """Read a binaray SST MRD (Microstructure Raw Data) file

    Parameters
    ----------
    filestream
    header_only
    pos_time_only

    """
    f = filestream

    n = 0
    np = 0  # Number of packets
    end_of_header = 0
    end_of_header_tmp = 0
    IN_HEADER = True
    header = b""
    HAVE_TIME = False
    HAVE_POS = False

    # The data
    data = {"date": [], "gps": []}
    data_tmp = {7: [], 8: []}
    nread = 4096
    bind = 0
    btmp = f.read(nread)
    logger.info("Start reading file")
    while True:
        if HAVE_TIME & HAVE_POS & pos_time_only:
            # print('Found time and pos, breaking at',n)
            break

        if len(btmp) < 17:
            btmp = btmp + f.read(nread)

        if IN_HEADER:
            # b = f.read(1)
            b = btmp[0:1]
            btmp = btmp[1:]
            bind += 1
            n += 1
        else:
            b = btmp[0:17]
            btmp = btmp[17:]
            bind += 17
            n += 17
            np += 1  # Number of 17 bytes long data packets (packet type + 8 words (little endian)

        if len(b) == 0:  # Nothing left anymore, break
            break
        else:
            # We are in the data body, lets get our data
            if IN_HEADER == False:
                bword = []
                for i in range(1, 16, 2):  # Reading 16 bytes and treat them as words
                    bword.append(int.from_bytes(b[i : i + 2], byteorder="little"))
                if b[0] == 1:  # Time packet of gps
                    if HAVE_TIME == False:
                        year = int(bword[0])
                        year = bword[0]
                        month = bword[1]
                        day = bword[2]
                        hour = bword[4]
                        minute = bword[5]
                        second = bword[6]
                        date = datetime.datetime(
                            year,
                            month,
                            day,
                            hour,
                            minute,
                            second,
                            tzinfo=timezone("UTC"),
                        )
                        data["date"].append([np, date])
                        HAVE_TIME = True

                elif b[0] == 3:  # Position packet of gps
                    if HAVE_POS == False:
                        # Latitude
                        latint = int.from_bytes(b[1:3], byteorder="little")
                        latsign = (latint & 0x8000) >> 15
                        if latsign == 0:
                            latsign = -1
                        lat = latint & 0x1FFF
                        latdeg = (lat - lat % 100) / 100
                        latmindec = int.from_bytes(b[3:5], byteorder="little")
                        if abs(latmindec) > 0:
                            digits = int(math.log10(latmindec)) + 1
                        else:
                            digits = 0

                        latmin = lat % 100 + latmindec / 10**digits
                        latdec = latsign * (latdeg + latmin / 60)
                        # Longitude
                        lonint = int.from_bytes(b[5:7], byteorder="little")
                        lonsign = (lonint & 0x8000) >> 15
                        if lonsign == 0:
                            lonsign = -1
                        lon = lonint & 0x1FFF
                        londeg = (lon - lon % 100) / 100
                        lonmindec = int.from_bytes(b[7:9], byteorder="little")
                        if abs(lonmindec) > 0:
                            digits = int(math.log10(lonmindec)) + 1
                        else:
                            digits = 0

                        lonmin = lon % 100 + lonmindec / 10**digits
                        londec = lonsign * (londeg + lonmin / 60)
                        # Daytime
                        tmp = (bword[6]) + (bword[4] & 0x000F) * 100000
                        hour = int((tmp - tmp % 10000) / 10000)
                        tmp2 = tmp % 10000
                        minute = int((tmp2 - tmp2 % 100) / 100)
                        second = tmp2 % 100
                        data["gps"].append(
                            [np, londec, latdec, lon, lat, hour, minute, second]
                        )
                        HAVE_POS = True

                elif b[0] == 7:  # Channels 0-7 od ADC
                    bc = numpy.frombuffer(b[1:], dtype=numpy.uint16)
                    data_tmp[7].append(bc)

                elif b[0] == 8:  # Channels 8-15 of ADC
                    bc = numpy.frombuffer(b[1:], dtype=numpy.uint16)
                    data_tmp[8].append(bc)

            else:  # We are in the header
                header += b
                if (
                    b == b"\x1a"
                ):  # Header is filled up with 0x1A until it is dividable by 17
                    end_of_header = n
                    data["end_of_header"] = end_of_header
                    # Check if dividable by 17
                    if (end_of_header) % 17 == 0:  # Dividable by 17
                        data["valid_mrd"] = True
                        # print('Found a valid header (dividable by 17)')
                        try:
                            data["header"] = header.decode("UTF-8")
                        except:
                            logger.info(
                                "UTF-8 did not work, trying to decode using Window charset"
                            )
                            data["header"] = header.decode("Windows-1252")
                        IN_HEADER = False
                        if header_only:
                            return data

    numsamples = min([len(data_tmp[7]), len(data_tmp[8])])
    data["numsamples"] = numsamples
    data["channels"] = numpy.hstack(
        [
            numpy.asarray((data_tmp[7][:numsamples])),
            numpy.asarray((data_tmp[8][:numsamples])),
        ]
    )
    return data

MSS HHL raw data format (instruments/mss/mss_hhl.py)

hhl

A processor for the HHL binary datastream from Sea & Sun Technology.

Source code in turban/instruments/mss/mss_hhl.py
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
class hhl:
    """A processor for the HHL binary datastream from Sea & Sun Technology."""

    def __init__(self, config={}):
        self.config = config
        self.buffer = b""  # a binary buffer for the rawdata
        self.ngood = 0
        self.nbad = 0

    def add_to_buffer(self, data):
        """Append data to the internal buffer.

        Parameters
        ----------
        data : bytes
            Data to append to the buffer.
        """
        self.buffer += data

    def align_buffer(self):
        """Align buffer to channel 0.

        Returns
        -------
        None or bool
            None if alignment failed, True if buffer is valid and ready to process.
        """

        self.FLAG_VALID_HHL = False
        self.FLAG_VALID_BUFFER = False

        # check for a valid HHL packet first, if found loop over packets and seek for channel 0, if found check if buffer is still enough to create one datapacket
        # Seek for a start, find two "H" and one "L" pattern
        n = len(self.buffer)
        logger.debug("Processing length %d", n)
        if n >= 48:  # We need at least 3*16 bytes for one complete datapacket
            # for i in range(3):
            for i in range(16):  # Check 16 bytes
                data_tmp = self.decode_HHL(self.buffer)
                # Check if the datastream is valid, if not delete the first byte and try again
                if data_tmp is not None:
                    logger.debug("Found valid HHL packet after %d", i)
                    self.FLAG_VALID_HHL = True
                    channel = data_tmp[0]
                    data = data_tmp[1]
                    break
                else:
                    # print('Did not found valid HHL packet')
                    self.buffer = self.buffer[1:]

            logger.info("Valid %s", self.FLAG_VALID_HHL)
            # Found valid HHL, now search for channel 0
            if self.FLAG_VALID_HHL == False:
                logger.debug("No valid hhl")
                return None
            else:
                # for i in range(0,16):
                for i in range(0, 20):
                    if channel == 0:  # Channel 0, great!
                        logger.debug("Found channel 0")
                        n = len(self.buffer)
                        if (
                            n >= 48
                        ):  # We need at least 3*16 bytes for one complete datapacket
                            logger.debug("Enough data to decode")
                            self.FLAG_VALID_BUFFER = True
                            return True
                        # print('Found channel 0')
                        break
                    else:  # throw data away and check next three bytes
                        self.buffer = self.buffer[3:]
                        [channel, data] = self.decode_HHL(self.buffer)

    def process_buffer(self):
        """Process data in the buffer.

        Returns
        -------
        list of (ndarray, ndarray) or None
            [channel_cat, data_cat] where channel_cat is an ndarray of channel
            numbers and data_cat is an ndarray of shape (n_packets, 16) with
            decoded data values. Returns None if buffer cannot be aligned.
        """
        self.FLAG_VALID_HHL = True  # legacy
        self.FLAG_VALID_BUFFER = True  # legacy
        nbad = 0

        align = self.align_buffer()
        if align is None:
            logger.debug("Could not align buffer")
            return None
        else:
            # If we have a valid buffer, starts with channel 0 and has at least 48 bytes
            # if self.FLAG_VALID_BUFFER:
            if True:
                logger.debug("Processing rawdata")
                data_cat = []
                channel_cat = []
                while True:
                    if len(self.buffer) < 48:  # We need at least 3 bytes to process
                        logger.debug("Not enough data left")
                        break
                    else:  # Process data
                        data_tmp = []
                        for i in range(0, 16):
                            # print(i,len(self.buffer))
                            data_proc = self.decode_HHL(self.buffer[0:3])
                            if data_proc is not None:
                                channel = data_proc[0]
                                data = data_proc[1]
                            else:
                                logger.debug("Problem decoding, realigning %s", data_proc)
                                align = self.align_buffer()
                                if align is None:
                                    logger.debug("Could not realign align buffer")
                                    return None

                            self.buffer = self.buffer[3:]
                            channel_cat.append(channel)
                            if channel == i:
                                data_tmp.append(data)
                            else:
                                logger.debug("Bad channel %d %d", channel, i)
                                align = self.align_buffer()
                                if align is None:
                                    logger.debug("Could not realign align buffer")
                                    return None
                                # return None

                            # print('Channel',channel)
                            # print('data {:04x}'.format(data))

                        # print('Len',len(data_tmp))
                        if len(data_tmp) == 16:
                            data_cat.append(data_tmp)
                            # print('Data tmp',data_tmp)
                            self.ngood += 1
                        else:
                            self.nbad += 1
                channel_cat = np.asarray(channel_cat)
                data_cat = np.asarray(data_cat)
                data_return = [channel_cat, data_cat]
                logger.info("N Good, N Bad hhl %d %d", self.ngood, self.nbad)
                return data_return

    def process_buffer_legacy(self):
        """Process data in the buffer (legacy version).

        Returns
        -------
        list of (ndarray, ndarray) or None
            [channel_cat, data_cat] where channel_cat is an ndarray of channel
            numbers and data_cat is an ndarray of shape (n_packets, 16) with
            decoded data values. Returns None if buffer cannot be aligned.
        """
        self.FLAG_VALID_HHL = False
        self.FLAG_VALID_BUFFER = False

        # check for a valid HHL packet first, if found loop over packets and seek for channel 0, if found check if buffer is still enough to create one datapacket
        # Seek for a start, find two "H" and one "L" pattern
        n = len(self.buffer)
        logger.debug("Processing length %d", n)
        if n >= 48:  # We need at least 3*16 bytes for one complete datapacket
            for i in range(3):
                data_tmp = self.decode_HHL(self.buffer)
                # Check if the datastream is valid, if not delete the first byte and try again
                if data_tmp is not None:
                    logger.debug('Found valid HHL packet')
                    self.FLAG_VALID_HHL = True
                    channel = data_tmp[0]
                    data = data_tmp[1]
                    break
                else:
                    logger.debug('Did not find valid HHL packet')
                    self.buffer = self.buffer[1:]

            logger.debug("Valid %s", self.FLAG_VALID_HHL)
            # Found valid HHL, now search for channel 0
            if self.FLAG_VALID_HHL == False:
                logger.debug("No valid hhl")
            else:
                # for i in range(0,16):
                for i in range(0, 20):
                    if channel == 0:  # Channel 0, great!
                        # logger.debug('Found channel 0')
                        n = len(self.buffer)
                        if (
                            n >= 48
                        ):  # We need at least 3*16 bytes for one complete datapacket
                            logger.debug("Enough data to decode")
                            self.FLAG_VALID_BUFFER = True
                        # print('Found channel 0')
                        break
                    else:  # throw data away and check next three bytes
                        self.buffer = self.buffer[3:]
                        [channel, data] = self.decode_HHL(self.buffer)

            # If we have a valid buffer, starts with channel 0 and has at least 48 bytes
            if self.FLAG_VALID_BUFFER:
                logger.debug("Processing rawdata")
                data_cat = []
                channel_cat = []
                while True:
                    if len(self.buffer) < 48:  # We need at least 3 bytes to process
                        logger.debug("Not enough data left")
                        break
                    else:  # Process data
                        data_tmp = []
                        for i in range(0, 16):
                            logger.debug("%d %d", i, len(self.buffer))
                            data_proc = self.decode_HHL(self.buffer[0:3])
                            if data_proc is not None:
                                channel = data_proc[0]
                                data = data_proc[1]
                            else:
                                logger.debug("Problem decoding %s", data_proc)
                                return None
                            self.buffer = self.buffer[3:]
                            channel_cat.append(channel)
                            if channel == i:
                                data_tmp.append(data)
                            else:
                                logger.debug("Bad channel %d %d", channel, i)
                                return None

                            # logger.debug('Channel %d', channel)
                            # logger.debug('data {:04x}'.format(data))

                        data_cat.append(data_tmp)
                channel_cat = np.asarray(channel_cat)
                data_cat = np.asarray(data_cat)
                data_return = [channel_cat, data_cat]
                # print('Data hhl',data_cat[-1])
                return data_return

    def decode_HHL(self, hhldata):
        """Decode three bytes of HHL data into channel and value.

        Parameters
        ----------
        hhldata : bytes
            HHL packet data, length >= 3.

        Returns
        -------
        list of (int, int) or None
            [channel, data] where channel is the channel number and data is
            the decoded 16-bit value. Returns None if the packet is invalid.
        """
        # Check if its a valid packet
        if len(hhldata) >= 2:
            # print('data',data,data[0:1])
            FLAG0 = hhldata[0] & 0x01 == 1
            FLAG1 = hhldata[1] & 0x01 == 1
            FLAG2 = hhldata[2] & 0x01 == 0
            if FLAG0 and FLAG1 and FLAG2:
                pass
            else:
                return None
        else:
            return None

        HHL0 = hhldata[0]
        HHL1 = hhldata[1]
        HHL2 = hhldata[2]
        # print("HHL: {:2x} {:2x} {:2x}".format(HHL0, HHL1, HHL2))
        channel = HHL2 >> 3
        data = HHL0 >> 1
        data = data | ((HHL1 & 0xFE) << 6)
        data = data | ((HHL2 & 0x06) << 13)
        return [channel, data]

    def valid_packet(self, data):
        """Check if the data packet has a valid HHL pattern.

        Parameters
        ----------
        data : bytes
            Data to validate.

        Returns
        -------
        bool
            True if the first three bytes have the HHL pattern, False otherwise.
        """
        if len(data) >= 2:
            # print('data',data,data[0:1])
            FLAG0 = data[0] & 0x01 == 1
            FLAG1 = data[1] & 0x01 == 1
            FLAG2 = data[2] & 0x01 == 0
            if FLAG0 and FLAG1 and FLAG2:
                return True
            else:
                return False
        else:
            return False

add_to_buffer(data)

Append data to the internal buffer.

Parameters

data : bytes Data to append to the buffer.

Source code in turban/instruments/mss/mss_hhl.py
17
18
19
20
21
22
23
24
25
def add_to_buffer(self, data):
    """Append data to the internal buffer.

    Parameters
    ----------
    data : bytes
        Data to append to the buffer.
    """
    self.buffer += data

align_buffer()

Align buffer to channel 0.

Returns

None or bool None if alignment failed, True if buffer is valid and ready to process.

Source code in turban/instruments/mss/mss_hhl.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def align_buffer(self):
    """Align buffer to channel 0.

    Returns
    -------
    None or bool
        None if alignment failed, True if buffer is valid and ready to process.
    """

    self.FLAG_VALID_HHL = False
    self.FLAG_VALID_BUFFER = False

    # check for a valid HHL packet first, if found loop over packets and seek for channel 0, if found check if buffer is still enough to create one datapacket
    # Seek for a start, find two "H" and one "L" pattern
    n = len(self.buffer)
    logger.debug("Processing length %d", n)
    if n >= 48:  # We need at least 3*16 bytes for one complete datapacket
        # for i in range(3):
        for i in range(16):  # Check 16 bytes
            data_tmp = self.decode_HHL(self.buffer)
            # Check if the datastream is valid, if not delete the first byte and try again
            if data_tmp is not None:
                logger.debug("Found valid HHL packet after %d", i)
                self.FLAG_VALID_HHL = True
                channel = data_tmp[0]
                data = data_tmp[1]
                break
            else:
                # print('Did not found valid HHL packet')
                self.buffer = self.buffer[1:]

        logger.info("Valid %s", self.FLAG_VALID_HHL)
        # Found valid HHL, now search for channel 0
        if self.FLAG_VALID_HHL == False:
            logger.debug("No valid hhl")
            return None
        else:
            # for i in range(0,16):
            for i in range(0, 20):
                if channel == 0:  # Channel 0, great!
                    logger.debug("Found channel 0")
                    n = len(self.buffer)
                    if (
                        n >= 48
                    ):  # We need at least 3*16 bytes for one complete datapacket
                        logger.debug("Enough data to decode")
                        self.FLAG_VALID_BUFFER = True
                        return True
                    # print('Found channel 0')
                    break
                else:  # throw data away and check next three bytes
                    self.buffer = self.buffer[3:]
                    [channel, data] = self.decode_HHL(self.buffer)

decode_HHL(hhldata)

Decode three bytes of HHL data into channel and value.

Parameters

hhldata : bytes HHL packet data, length >= 3.

Returns

list of (int, int) or None [channel, data] where channel is the channel number and data is the decoded 16-bit value. Returns None if the packet is invalid.

Source code in turban/instruments/mss/mss_hhl.py
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
def decode_HHL(self, hhldata):
    """Decode three bytes of HHL data into channel and value.

    Parameters
    ----------
    hhldata : bytes
        HHL packet data, length >= 3.

    Returns
    -------
    list of (int, int) or None
        [channel, data] where channel is the channel number and data is
        the decoded 16-bit value. Returns None if the packet is invalid.
    """
    # Check if its a valid packet
    if len(hhldata) >= 2:
        # print('data',data,data[0:1])
        FLAG0 = hhldata[0] & 0x01 == 1
        FLAG1 = hhldata[1] & 0x01 == 1
        FLAG2 = hhldata[2] & 0x01 == 0
        if FLAG0 and FLAG1 and FLAG2:
            pass
        else:
            return None
    else:
        return None

    HHL0 = hhldata[0]
    HHL1 = hhldata[1]
    HHL2 = hhldata[2]
    # print("HHL: {:2x} {:2x} {:2x}".format(HHL0, HHL1, HHL2))
    channel = HHL2 >> 3
    data = HHL0 >> 1
    data = data | ((HHL1 & 0xFE) << 6)
    data = data | ((HHL2 & 0x06) << 13)
    return [channel, data]

process_buffer()

Process data in the buffer.

Returns

list of (ndarray, ndarray) or None [channel_cat, data_cat] where channel_cat is an ndarray of channel numbers and data_cat is an ndarray of shape (n_packets, 16) with decoded data values. Returns None if buffer cannot be aligned.

Source code in turban/instruments/mss/mss_hhl.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def process_buffer(self):
    """Process data in the buffer.

    Returns
    -------
    list of (ndarray, ndarray) or None
        [channel_cat, data_cat] where channel_cat is an ndarray of channel
        numbers and data_cat is an ndarray of shape (n_packets, 16) with
        decoded data values. Returns None if buffer cannot be aligned.
    """
    self.FLAG_VALID_HHL = True  # legacy
    self.FLAG_VALID_BUFFER = True  # legacy
    nbad = 0

    align = self.align_buffer()
    if align is None:
        logger.debug("Could not align buffer")
        return None
    else:
        # If we have a valid buffer, starts with channel 0 and has at least 48 bytes
        # if self.FLAG_VALID_BUFFER:
        if True:
            logger.debug("Processing rawdata")
            data_cat = []
            channel_cat = []
            while True:
                if len(self.buffer) < 48:  # We need at least 3 bytes to process
                    logger.debug("Not enough data left")
                    break
                else:  # Process data
                    data_tmp = []
                    for i in range(0, 16):
                        # print(i,len(self.buffer))
                        data_proc = self.decode_HHL(self.buffer[0:3])
                        if data_proc is not None:
                            channel = data_proc[0]
                            data = data_proc[1]
                        else:
                            logger.debug("Problem decoding, realigning %s", data_proc)
                            align = self.align_buffer()
                            if align is None:
                                logger.debug("Could not realign align buffer")
                                return None

                        self.buffer = self.buffer[3:]
                        channel_cat.append(channel)
                        if channel == i:
                            data_tmp.append(data)
                        else:
                            logger.debug("Bad channel %d %d", channel, i)
                            align = self.align_buffer()
                            if align is None:
                                logger.debug("Could not realign align buffer")
                                return None
                            # return None

                        # print('Channel',channel)
                        # print('data {:04x}'.format(data))

                    # print('Len',len(data_tmp))
                    if len(data_tmp) == 16:
                        data_cat.append(data_tmp)
                        # print('Data tmp',data_tmp)
                        self.ngood += 1
                    else:
                        self.nbad += 1
            channel_cat = np.asarray(channel_cat)
            data_cat = np.asarray(data_cat)
            data_return = [channel_cat, data_cat]
            logger.info("N Good, N Bad hhl %d %d", self.ngood, self.nbad)
            return data_return

process_buffer_legacy()

Process data in the buffer (legacy version).

Returns

list of (ndarray, ndarray) or None [channel_cat, data_cat] where channel_cat is an ndarray of channel numbers and data_cat is an ndarray of shape (n_packets, 16) with decoded data values. Returns None if buffer cannot be aligned.

Source code in turban/instruments/mss/mss_hhl.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
def process_buffer_legacy(self):
    """Process data in the buffer (legacy version).

    Returns
    -------
    list of (ndarray, ndarray) or None
        [channel_cat, data_cat] where channel_cat is an ndarray of channel
        numbers and data_cat is an ndarray of shape (n_packets, 16) with
        decoded data values. Returns None if buffer cannot be aligned.
    """
    self.FLAG_VALID_HHL = False
    self.FLAG_VALID_BUFFER = False

    # check for a valid HHL packet first, if found loop over packets and seek for channel 0, if found check if buffer is still enough to create one datapacket
    # Seek for a start, find two "H" and one "L" pattern
    n = len(self.buffer)
    logger.debug("Processing length %d", n)
    if n >= 48:  # We need at least 3*16 bytes for one complete datapacket
        for i in range(3):
            data_tmp = self.decode_HHL(self.buffer)
            # Check if the datastream is valid, if not delete the first byte and try again
            if data_tmp is not None:
                logger.debug('Found valid HHL packet')
                self.FLAG_VALID_HHL = True
                channel = data_tmp[0]
                data = data_tmp[1]
                break
            else:
                logger.debug('Did not find valid HHL packet')
                self.buffer = self.buffer[1:]

        logger.debug("Valid %s", self.FLAG_VALID_HHL)
        # Found valid HHL, now search for channel 0
        if self.FLAG_VALID_HHL == False:
            logger.debug("No valid hhl")
        else:
            # for i in range(0,16):
            for i in range(0, 20):
                if channel == 0:  # Channel 0, great!
                    # logger.debug('Found channel 0')
                    n = len(self.buffer)
                    if (
                        n >= 48
                    ):  # We need at least 3*16 bytes for one complete datapacket
                        logger.debug("Enough data to decode")
                        self.FLAG_VALID_BUFFER = True
                    # print('Found channel 0')
                    break
                else:  # throw data away and check next three bytes
                    self.buffer = self.buffer[3:]
                    [channel, data] = self.decode_HHL(self.buffer)

        # If we have a valid buffer, starts with channel 0 and has at least 48 bytes
        if self.FLAG_VALID_BUFFER:
            logger.debug("Processing rawdata")
            data_cat = []
            channel_cat = []
            while True:
                if len(self.buffer) < 48:  # We need at least 3 bytes to process
                    logger.debug("Not enough data left")
                    break
                else:  # Process data
                    data_tmp = []
                    for i in range(0, 16):
                        logger.debug("%d %d", i, len(self.buffer))
                        data_proc = self.decode_HHL(self.buffer[0:3])
                        if data_proc is not None:
                            channel = data_proc[0]
                            data = data_proc[1]
                        else:
                            logger.debug("Problem decoding %s", data_proc)
                            return None
                        self.buffer = self.buffer[3:]
                        channel_cat.append(channel)
                        if channel == i:
                            data_tmp.append(data)
                        else:
                            logger.debug("Bad channel %d %d", channel, i)
                            return None

                        # logger.debug('Channel %d', channel)
                        # logger.debug('data {:04x}'.format(data))

                    data_cat.append(data_tmp)
            channel_cat = np.asarray(channel_cat)
            data_cat = np.asarray(data_cat)
            data_return = [channel_cat, data_cat]
            # print('Data hhl',data_cat[-1])
            return data_return

valid_packet(data)

Check if the data packet has a valid HHL pattern.

Parameters

data : bytes Data to validate.

Returns

bool True if the first three bytes have the HHL pattern, False otherwise.

Source code in turban/instruments/mss/mss_hhl.py
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
def valid_packet(self, data):
    """Check if the data packet has a valid HHL pattern.

    Parameters
    ----------
    data : bytes
        Data to validate.

    Returns
    -------
    bool
        True if the first three bytes have the HHL pattern, False otherwise.
    """
    if len(data) >= 2:
        # print('data',data,data[0:1])
        FLAG0 = data[0] & 0x01 == 1
        FLAG1 = data[1] & 0x01 == 1
        FLAG2 = data[2] & 0x01 == 0
        if FLAG0 and FLAG1 and FLAG2:
            return True
        else:
            return False
    else:
        return False

MicroRider common data classes (instruments/microrider/rsCommon.py)

MicroRider configuration parser (instruments/microrider/rsConfig_parser.py)

MicroRiderConfig

Class to handle MicroRider configuration from the raw setupstr read from the header information.

Parameters

str

string as found in the header of the data file, containing there configuration of the MicroRider.

The class provieds methods to query the configuration of a specific channel, as well as extracting the raw channel names, cruise and instrument information.

Example

cnf = MicroRiderConfig() cnf.parse(setupstring)

get the cruise information

cnf.get_config("cruise_info")

get the config of a given section...

cnf.get_config("channel08")

get the config of given channel name...

cnf.get_channel_config("sh1")

Source code in turban/instruments/microrider/rsConfig_parser.py
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
class MicroRiderConfig:
    """Class to handle MicroRider configuration from the raw setupstr
    read from the header information.

    Parameters
    ----------

    setupstring : str
        string as found in the header of the data file, containing there
        configuration of the MicroRider.

    The class provieds methods to query the configuration of a specific
    channel, as well as extracting the raw channel names, cruise and
    instrument information.

    Example
    -------

    >>> cnf = MicroRiderConfig()
    >>> cnf.parse(setupstring)
    ### get the cruise information
    >>> cnf.get_config("cruise_info")
    ### get the config of a given section...
    >>> cnf.get_config("channel08")
    ### get the config of given channel name...
    >>> cnf.get_channel_config("sh1")
    """

    def __init__(self) -> None:
        self._config: dict[str, dict[str, Any]] = {}
        self._current_section: str = "root"
        self._number_of_channels: int = 0

    def parse(self, config_text: str) -> None:
        """
        Parse the configuration text into a nested dictionary.

        Parameters
        ----------
        config_text: str
            Configuration file contents as a string
        """
        # type hint parsed_value
        parsed_value: Any

        channel_index = 0
        # Initialize root section
        self._config["root"] = {}

        # Split the text into lines and process each line
        for line in config_text.split("\n"):
            # Remove leading/trailing whitespace
            line = line.strip()

            # Skip empty lines and comments
            if not line or line.startswith(";"):
                continue
            # replace Any tabs with spaces.
            line = line.replace("\t", " ")
            # Check for section header
            section_match = re.match(r"^\[(\w+)\]$", line)
            if section_match:
                section_name = section_match.group(1)
                # add an index number to all channel sections, as they
                # appear multiples times.
                if section_name == "channel":
                    self._current_section = f"channel{channel_index:02d}"
                    channel_index += 1
                    self._number_of_channels = channel_index
                else:
                    self._current_section = section_name
                self._config[self._current_section] = {}
                continue

            # Parse key-value pairs
            kv_match = re.match(r"(^.+?)\s*;.*$", line)
            if kv_match:  # trailing comment
                line = kv_match.group(1)  # discards comment and Any leading spaces.
            kv_match = re.match(r"^(\w+)\s*=\s*(.+)$", line)
            if kv_match:
                key, value = kv_match.groups()
                # Special handling for matrix rows
                if self._current_section == "matrix" and key.startswith("row"):
                    # Parse list of integers for matrix rows
                    try:
                        parsed_value = [int(x.strip()) for x in value.split()]
                    except ValueError:
                        raise ValueError(f"Invalid integer list for {key}")
                else:
                    # Parse single values (int, float, or string)
                    parsed_value = self._parse_single_value(value)

                self._config[self._current_section][key] = parsed_value

    @property
    def number_of_channels(self) -> int:
        return self._number_of_channels

    def _parse_single_value(self, value: str) -> Any:
        """
        Parse a single value to the most appropriate type.

        Parameters
        ----------
        value: str
             Value to parse

        Returns
        -------
        int | float | string
            Parsed value as int, float, or string
        """
        value = value.strip()

        # Try parsing as int
        try:
            return int(value)
        except ValueError:
            # Try parsing as float
            try:
                return float(value)
            except ValueError:
                # Return as string
                return value

    def get_config(self) -> dict[str, dict[str, Any]]:
        """
        Retrieve the parsed configuration.

        Returns
        -------
        dict[str, dict[str, str | int | float | list[int]]]
             Nested dictionary of configuration
        """
        return self._config

    def get_section(self, section: str = "root") -> dict[str, Any]:
        """
        Retrieve a specific section of the configuration.

        Parameters
        ----------
        section: str
             Section name (defaults to 'root')

        Returns
        -------
        dict[str, str | int | float | list[int]]
            Dictionary of key-value pairs in the section

        Raises
        ------
        KeyError
            If the section does not exist
        """
        return self._config[section]

    def get_channel_name_map(self) -> dict[str, str]:
        """
        Get a mapping of channel sensor name -> channel section name

        Returns
        -------
        dict[str, str]
        """
        channels = [k for k in self._config.keys() if k.startswith("channel")]
        channel_map = dict([(str(self.get_section(c)["name"]), c) for c in channels])
        return channel_map

    def get_channel_config(self, name: str) -> dict[str, Any] | None:
        """
        Get the configuration of a channel

        Parameters
        ----------
        name : str
            name of channel

        Returns
        -------
        dict[str, str | int | float | list[int]] | None
            a dictionary with configuration values or None if name is not found.
        """
        channel_map = self.get_channel_name_map()
        if name in channel_map.keys():
            return self.get_section(channel_map[name])
        else:
            return None

get_channel_config(name)

Get the configuration of a channel

Parameters

name : str name of channel

Returns

dict[str, str | int | float | list[int]] | None a dictionary with configuration values or None if name is not found.

Source code in turban/instruments/microrider/rsConfig_parser.py
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
def get_channel_config(self, name: str) -> dict[str, Any] | None:
    """
    Get the configuration of a channel

    Parameters
    ----------
    name : str
        name of channel

    Returns
    -------
    dict[str, str | int | float | list[int]] | None
        a dictionary with configuration values or None if name is not found.
    """
    channel_map = self.get_channel_name_map()
    if name in channel_map.keys():
        return self.get_section(channel_map[name])
    else:
        return None

get_channel_name_map()

Get a mapping of channel sensor name -> channel section name

Returns

dict[str, str]

Source code in turban/instruments/microrider/rsConfig_parser.py
162
163
164
165
166
167
168
169
170
171
172
def get_channel_name_map(self) -> dict[str, str]:
    """
    Get a mapping of channel sensor name -> channel section name

    Returns
    -------
    dict[str, str]
    """
    channels = [k for k in self._config.keys() if k.startswith("channel")]
    channel_map = dict([(str(self.get_section(c)["name"]), c) for c in channels])
    return channel_map

get_config()

Retrieve the parsed configuration.

Returns

dict[str, dict[str, str | int | float | list[int]]] Nested dictionary of configuration

Source code in turban/instruments/microrider/rsConfig_parser.py
130
131
132
133
134
135
136
137
138
139
def get_config(self) -> dict[str, dict[str, Any]]:
    """
    Retrieve the parsed configuration.

    Returns
    -------
    dict[str, dict[str, str | int | float | list[int]]]
         Nested dictionary of configuration
    """
    return self._config

get_section(section='root')

Retrieve a specific section of the configuration.

Parameters

section: str Section name (defaults to 'root')

Returns

dict[str, str | int | float | list[int]] Dictionary of key-value pairs in the section

Raises

KeyError If the section does not exist

Source code in turban/instruments/microrider/rsConfig_parser.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
def get_section(self, section: str = "root") -> dict[str, Any]:
    """
    Retrieve a specific section of the configuration.

    Parameters
    ----------
    section: str
         Section name (defaults to 'root')

    Returns
    -------
    dict[str, str | int | float | list[int]]
        Dictionary of key-value pairs in the section

    Raises
    ------
    KeyError
        If the section does not exist
    """
    return self._config[section]

parse(config_text)

Parse the configuration text into a nested dictionary.

Parameters

config_text: str Configuration file contents as a string

Source code in turban/instruments/microrider/rsConfig_parser.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def parse(self, config_text: str) -> None:
    """
    Parse the configuration text into a nested dictionary.

    Parameters
    ----------
    config_text: str
        Configuration file contents as a string
    """
    # type hint parsed_value
    parsed_value: Any

    channel_index = 0
    # Initialize root section
    self._config["root"] = {}

    # Split the text into lines and process each line
    for line in config_text.split("\n"):
        # Remove leading/trailing whitespace
        line = line.strip()

        # Skip empty lines and comments
        if not line or line.startswith(";"):
            continue
        # replace Any tabs with spaces.
        line = line.replace("\t", " ")
        # Check for section header
        section_match = re.match(r"^\[(\w+)\]$", line)
        if section_match:
            section_name = section_match.group(1)
            # add an index number to all channel sections, as they
            # appear multiples times.
            if section_name == "channel":
                self._current_section = f"channel{channel_index:02d}"
                channel_index += 1
                self._number_of_channels = channel_index
            else:
                self._current_section = section_name
            self._config[self._current_section] = {}
            continue

        # Parse key-value pairs
        kv_match = re.match(r"(^.+?)\s*;.*$", line)
        if kv_match:  # trailing comment
            line = kv_match.group(1)  # discards comment and Any leading spaces.
        kv_match = re.match(r"^(\w+)\s*=\s*(.+)$", line)
        if kv_match:
            key, value = kv_match.groups()
            # Special handling for matrix rows
            if self._current_section == "matrix" and key.startswith("row"):
                # Parse list of integers for matrix rows
                try:
                    parsed_value = [int(x.strip()) for x in value.split()]
                except ValueError:
                    raise ValueError(f"Invalid integer list for {key}")
            else:
                # Parse single values (int, float, or string)
                parsed_value = self._parse_single_value(value)

            self._config[self._current_section][key] = parsed_value

MicroRider channel converters (instruments/microrider/rsConversions.py)

Adis

Bases: object

The ADIS16209 inclinometer outputs words that combine data with status flags. The data and status flags must be seperated before the data can be converted into physical units. This function extracts the data and converts it into valid 16bit, 2s-compliment words.

ADIS16209 words have the following format:

bit15 = new data present
bit14 = error flag

bit[13:0] = X and Y inclination data, 2s-compliment (X and Y channels) bit[11:0] = temperature data, unsigned (temperature channel)

Source code in turban/instruments/microrider/rsConversions.py
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
class Adis(object):
    """
    The ADIS16209 inclinometer outputs words that combine data with status flags.
    The data and status flags must be seperated before the data can be converted
    into physical units.  This function extracts the data and converts it into
    valid 16bit, 2s-compliment words.

    ADIS16209 words have the following format:

        bit15 = new data present
        bit14 = error flag
    bit[13:0] = X and Y inclination data, 2s-compliment (X and Y channels)
    bit[11:0] = temperature data, unsigned              (temperature channel)
    """

    def __init__(self) -> None:
        # self.errorFlag : np.typing.NDArray[np.int16] = np.array([], dtype=np.int16)
        # self.oldFlag : np.typing.NDArray[np.int16] = np.array([], dtype=np.int16)
        pass

    def convert(self, v: np.typing.NDArray[np.int16]) -> np.typing.NDArray[np.float64]:
        u = v.astype(">u2")  # unsigned 16 bit integer
        b = np.bitwise_and(u, 1 << 15) // (1 << 15)  # b==1: new data b==0: old data
        # self.oldFlag = np.where(b==0)[0]
        b = np.bitwise_and(u, 1 << 14) // (1 << 14)  # b==1: error b==0: no error
        # self.errorFlag = np.where(b==1)[0]
        b = np.bitwise_and(u, (1 << 13) + (1 << 12))
        v_raw: np.typing.NDArray[np.int16] | np.typing.NDArray[np.uint16]
        if np.all(b == 0):
            # dtype = '>u2' # big endian unsigned int 2 byte (16 bit)
            v_raw = np.bitwise_and(u, (1 << 13) - 1)
            v_raw -= np.bitwise_and(u, 1 << 13)  # two's compliment.
            v_raw = v_raw.astype(np.uint16)
        else:
            # dtype = '>i2' # big endian signed int 2 byte (16 bit)
            v_raw = np.bitwise_and(u, (1 << 13) - 1)
            v_raw -= np.bitwise_and(u, 1 << 13)  # two's compliment.
            v_raw = v_raw.astype(np.int16)
        return v_raw.astype(np.float64)

Aem1g_a

Bases: Converter

Specific converter method for Aem1g_a type channels

Source code in turban/instruments/microrider/rsConversions.py
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
class Aem1g_a(Converter):
    """Specific converter method for Aem1g_a type channels"""

    def __init__(self, config: common.ChannelConfigABC):
        super().__init__(config)
        self.defaults = common.ChannelConfigU_EM(bias=0.0, units="[ m s^{-1} ]")

    def convert(
        self, v: np.typing.NDArray[np.float64] | np.typing.NDArray[np.int16]
    ) -> np.typing.NDArray[np.float64]:
        bias = self.get_parameter("bias")
        adc_fs = self.get_parameter("adc_fs")
        adc_bits = self.get_parameter("adc_bits")
        adc_zero = self.get_optional_parameter("adc_zero")
        a = self.get_parameter("a") / 100  # cm/s -> m/s
        b = self.get_parameter("b") / 100  # cm/s -> m/s
        v_unit: np.typing.NDArray[np.float64]
        if adc_zero is None:
            adc_zero = adc_fs / 2
        v_unit = adc_zero + v * adc_fs / 2**adc_bits
        v_unit = a + b * v_unit
        v_unit -= bias
        if b < 1:
            m = """Provided coefficients (a,b) are not correct.  Each
instrument provides two sets of calibration results 
- one for analog output and one for digital output. 
The analog values should be used for this channel type."""
            logger.warning(m)
        return v_unit

Aem1g_d

Bases: Converter

Specific converter method for Aem1g_d type channels

Source code in turban/instruments/microrider/rsConversions.py
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
class Aem1g_d(Converter):
    """Specific converter method for Aem1g_d type channels"""

    def __init__(self, config: common.ChannelConfigABC):
        super().__init__(config)
        self.defaults = common.ChannelConfigU_EM(bias=0.0, units="[ m s^{-1} ]")

    def convert(
        self, v: np.typing.NDArray[np.float64] | np.typing.NDArray[np.int16]
    ) -> np.typing.NDArray[np.float64]:
        u = v.astype(">u2")  # unsigned 16 bit integer
        a = self.get_parameter("a") / 100  # cm/s -> m/s
        b = self.get_parameter("b") / 100  # cm/s -> m/s
        bias = self.get_parameter("bias")
        v_unit: np.typing.NDArray[np.float64]
        v_unit = a + b * u
        v_unit -= bias
        if b > 1:
            m = """Provided coefficients (a,b) are not correct.  Each
instrument provides two sets of calibration results 
- one for analog output and one for digital output. 
The digital values should be used for this channel type."""
            logger.warning(m)
        return v_unit

Converter

Bases: ABC

Base class for detailed converter classes

Parameters

config : ChannelConfig channel configuration dataclass

Source code in turban/instruments/microrider/rsConversions.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
class Converter(ABC):
    """Base class for detailed converter classes

    Parameters
    ----------
    config : ChannelConfig
        channel configuration dataclass
    """

    def __init__(self, config: common.ChannelConfigABC) -> None:
        self.config: common.ChannelConfigABC = config
        self.defaults: common.ChannelConfigABC

    def __call__(
        self, v: np.typing.NDArray[np.float64] | np.typing.NDArray[np.int16]
    ) -> np.typing.NDArray[np.float64]:
        return self.convert(v)

    def get_parameter(self, p: str) -> int | float:
        """Looks up a parameter setting from the configuration

        Parameters
        ----------
        p : string
            parameter name

        Returns
        float
            value of parameter setting.

        The method returs the value set in the configuration, or a default value when no
        value is configured. If also no default value is given, a ValueError is raised.
        """
        # return the value if it is configured
        if self.config.is_set(p):
            return_value = getattr(self.config, p)
            if isinstance(return_value, int) or isinstance(return_value, float):
                return return_value
            else:
                raise ValueError("Unexpected config value encountered.")
        # not configured, return the value from the defaults, if configured
        if self.defaults.is_set(p):
            return_value = getattr(self.defaults, p)
            if isinstance(return_value, int) or isinstance(return_value, float):
                return return_value
            else:
                raise ValueError("Unexpected default value encountered.")
        else:
            raise ValueError(
                f"Parameter {p} is required, but not given in config or default dict"
            )

    def get_optional_parameter(self, p: str) -> int | float | None:
        """Looks up a parameter setting from the configuration

        Parameters
        ----------
        p : string
            parameter name

        Returns
        float
            value of parameter setting.

        The method returs the value set in the configuration, or a default value when no
        value is configured. If also no default value is given None is returned.
        """
        try:
            return_value = self.get_parameter(p)
        except ValueError:
            return_value = None
        return return_value

    @abstractmethod
    def convert(
        self, v: np.typing.NDArray[np.float64] | np.typing.NDArray[np.int16]
    ) -> np.typing.NDArray[np.float64]:
        pass

get_optional_parameter(p)

Looks up a parameter setting from the configuration

Parameters

p : string parameter name

Returns float value of parameter setting.

The method returs the value set in the configuration, or a default value when no value is configured. If also no default value is given None is returned.

Source code in turban/instruments/microrider/rsConversions.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def get_optional_parameter(self, p: str) -> int | float | None:
    """Looks up a parameter setting from the configuration

    Parameters
    ----------
    p : string
        parameter name

    Returns
    float
        value of parameter setting.

    The method returs the value set in the configuration, or a default value when no
    value is configured. If also no default value is given None is returned.
    """
    try:
        return_value = self.get_parameter(p)
    except ValueError:
        return_value = None
    return return_value

get_parameter(p)

Looks up a parameter setting from the configuration

Parameters

p : string parameter name

Returns float value of parameter setting.

The method returs the value set in the configuration, or a default value when no value is configured. If also no default value is given, a ValueError is raised.

Source code in turban/instruments/microrider/rsConversions.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def get_parameter(self, p: str) -> int | float:
    """Looks up a parameter setting from the configuration

    Parameters
    ----------
    p : string
        parameter name

    Returns
    float
        value of parameter setting.

    The method returs the value set in the configuration, or a default value when no
    value is configured. If also no default value is given, a ValueError is raised.
    """
    # return the value if it is configured
    if self.config.is_set(p):
        return_value = getattr(self.config, p)
        if isinstance(return_value, int) or isinstance(return_value, float):
            return return_value
        else:
            raise ValueError("Unexpected config value encountered.")
    # not configured, return the value from the defaults, if configured
    if self.defaults.is_set(p):
        return_value = getattr(self.defaults, p)
        if isinstance(return_value, int) or isinstance(return_value, float):
            return return_value
        else:
            raise ValueError("Unexpected default value encountered.")
    else:
        raise ValueError(
            f"Parameter {p} is required, but not given in config or default dict"
        )

Deconvolve

Bases: object

Description


Deconvolve a vector of pre-emphasized data (temperature, conductivity, or pressure) to yield high-resolution data. The pre-emphasized signal (x+gain*dx/dt) is low-pass filtered using appropriate initial conditions after the method described in Mudge and Lueck, 1994.

For pressure, you must pass both 'X' and 'X_dX' to this function. Both vectors are needed to make a good estimate of the initial conditions for filtering. Initial conditions are very important for pressure because the gain is usually $\SI{\sim20}{\s}$, and errors caused by a poor choice of initial conditions can last for several hundred seconds! In addition, the high-resolution pressure is linearly adjusted to match the low-resolution pressure so that the factory calibration coefficients can later be used to convert this signal into physical units.

The gains for all signal types are provided in the calibration report of your instrument.

Examples

C1_hres = deconvolve( 'C1_dC1', [], C1_dC1, fs_fast, setupfilestr )

Deconvolve the micro-conductivity channel using the diff_gain parameter from the 'C1_dC1' channel section found within the supplied configuration string. Because there is no channel without pre-emphasis, the argument is left empty.

T1_hres = deconvolve( '', T1, T1_dT1, fs_fast, 1.034 )

Deconvole the thermistor data using both channels with and without pre-emphasis. Using both channels improves the initial deconvolution accuracy and compensates for slight variations in the calibration coefficients between the two channels. Note the explicitly provided value for diff_gain. One typically provides the configuration string and channel name as shown in the previous example.

Mudge, T.D. and R.G. Lueck, 1994: Digital signal processing to enhance oceanographic observations, J. Atmos. Oceanogr. Techn., 11, 825-836.

@image @images/pressure_deconvolution1 @Deconvolution example. @The green curve is from the normal pressure channel, and the blue curve is derived from the pre-emphasized pressure channel. This data is from a profiler that has impacted the soft bottom of a lake. Both signals are shown with full bandwidth (0 - 32 Hz) without any smoothing. The full-scale range of the pressure transducer is 500 dBar.

@image @images/pressure_deconvolution2 @Deconvolution example two. @Same as previous figure but with zoom-in on only the high-resolution pressure. Again, full bandwidth without any smoothing.

@image @images/pressure_deconvolution3 @The rate of change of pressure derived from the normal pressure signal and the high-resolution pressure signals using the gradient function. @Full bandwidth signals of the rate of change of pressure. The normal pressure signal (green) produces a fall-rate that is useless without extensive smoothing because it is not even monotonic. The rate of change of the high-resolution pressure (blue) is smooth, always positive, and, therefore, the high-resolution pressure itself, can be used directly for plotting other variables as a function pressure (or depth). The high-resolution rate of change of pressure has been multiplied by 10 for visual clarity. The fall-rate is about $\SI{0.17}{\m\per\s}$.

Source code in turban/instruments/microrider/rsConversions.py
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
class Deconvolve(object):
    r"""Description
      -----------
    Deconvolve a vector of pre-emphasized data (temperature, conductivity, or
    pressure) to yield high-resolution data. The pre-emphasized signal
    (x+gain*dx/dt) is low-pass filtered using appropriate initial conditions
    after the method described in Mudge and Lueck, 1994.

    For pressure, you must pass both 'X' and 'X_dX' to this function. Both
    vectors are needed to make a good estimate of the initial conditions for
    filtering. Initial conditions are very important for pressure because the
    gain is usually $\SI{\sim20}{\s}$, and errors caused by a poor choice of initial
    conditions can last for several hundred seconds! In addition, the
    high-resolution pressure is linearly adjusted to match the low-resolution
    pressure so that the factory calibration coefficients can later be used to
    convert this signal into physical units.

    The gains for all signal types are provided in the calibration report of
    your instrument.

      Examples

       >> C1_hres = deconvolve( 'C1_dC1', [], C1_dC1, fs_fast, setupfilestr )

    Deconvolve the micro-conductivity channel using the diff_gain parameter
    from the 'C1_dC1' channel section found within the supplied configuration
    string.  Because there is no channel without pre-emphasis, the argument is
    left empty.

       >> T1_hres = deconvolve( '', T1, T1_dT1, fs_fast, 1.034 )

    Deconvole the thermistor data using both channels with and without
    pre-emphasis.  Using both channels improves the initial deconvolution
    accuracy and compensates for slight variations in the calibration
    coefficients between the two channels. Note the explicitly provided value
    for diff_gain. One typically provides the configuration string and
    channel name as shown in the previous example.

    Mudge, T.D. and R.G. Lueck, 1994: Digital signal processing to enhance
    oceanographic observations, _J. Atmos. Oceanogr. Techn._, 11, 825-836.

    @image @images/pressure_deconvolution1 @Deconvolution example. @The green
    curve is from the normal pressure channel, and the blue curve is derived from
    the pre-emphasized pressure channel. This data is from a profiler that has impacted
    the soft bottom of a lake. Both signals are shown with full bandwidth (0 - 32 Hz)
    without any smoothing. The full-scale range of the pressure transducer is 500 dBar.

    @image @images/pressure_deconvolution2 @Deconvolution example two. @Same
    as previous figure but with zoom-in on only the high-resolution pressure. Again,
    full bandwidth without any smoothing.

    @image @images/pressure_deconvolution3 @The rate of change of pressure derived
    from the normal pressure signal and the high-resolution pressure signals using
    the gradient function. @Full bandwidth signals of the rate of change of
    pressure. The normal pressure signal (green) produces a fall-rate that is
    useless without extensive smoothing because it is not even
    monotonic. The rate of change of the high-resolution pressure (blue) is smooth, always
    positive, and, therefore, the high-resolution pressure itself, can be used directly for
    plotting other variables as a function pressure (or depth). The
    high-resolution rate of change of pressure has been multiplied by 10 for visual
    clarity. The fall-rate is about $\SI{0.17}{\m\per\s}$."""

    def __init__(
        self,
        X_dX: np.typing.NDArray[np.float64],
        X: np.typing.NDArray[np.float64],
        fs: float,
        diff_gain: float,
    ):
        self.fs = fs
        self.diff_gain = diff_gain
        X = self.interpolate(X_dX, X)
        self.X_hires = self.bw_filter(X_dX, X)
        self.X = X

    def bw_filter(
        self, X_dX: np.typing.NDArray[np.float64], X: np.typing.NDArray[np.float64]
    ) -> np.typing.NDArray[np.float64]:
        fc = 1 / (2 * np.pi * self.diff_gain)
        b, a = ss.butter(1, fc / (self.fs / 2))
        if not X is None:
            p = np.polyfit(X, X_dX, 1)
            if p[0] < -0.5:
                X_dX = -X_dX
            z = ss.lfiltic(b, a, X[:1], X_dX[:1])
        else:
            tm = np.arange(0, 2 * self.diff_gain, 1 / self.fs)
            p = np.polyfit(tm, X_dX[: tm.shape[0]], 1)
            previousOutput = np.array([p[1] - self.diff_gain * p[0]])
            z = ss.lfiltic(b, a, previousOutput, X_dX[:1])
        X_hires, zf = ss.lfilter(b, a, X_dX, zi=z)

        # %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
        # %%%%% For the pressure, regress against the low-resolution vector to %%%%%
        # %%%%% remove the small offset in the derivative circuit.             %%%%%
        # %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

        #  Algorithm for signals that have both with and without pre-emphasis data.
        #  1) Interpolate the non pre-emphasis signal so we can perform a polyfit.
        #  2) Deconvolve using the first "X" data point for the initial condition.
        #  3) Polyfit the deconvolved signal to the non pre-emphasis signal.
        #  4) Calculate new initial conditions after applying an offset to X based
        #     from the polyfit applied in step 3.
        #  5) Perform a new deconvolve with updated initial conditions.
        #  6) Apply the polynomial from step 3 to the new data.  A new polyfit is
        #     not required - they will be almost identicle.

        if not X is None:
            p = np.polyfit(X_hires, X, 1)
            p2 = np.array([2 - p[0], -p[1]])
            initialOutput = np.polyval(p2, X[:1])
            z = ss.lfiltic(b, a, initialOutput, X_dX[:1])
            X_hires, zf = ss.lfilter(b, a, X_dX, zi=z)
            X_hires = np.polyval(p, X_hires).astype(np.float64)
        return X_hires

    def interpolate(
        self, X_dX: np.typing.NDArray[np.float64], X: np.typing.NDArray[np.float64]
    ) -> typing.Any:
        # only interpolate if X and X_dX are not of equal length
        if X is None or (X.shape == X_dX.shape):
            return X  # nothing to do
        f_slow = self.fs * X.shape[0] / X_dX.shape[0]
        t_slow = np.arange(X.shape[0]) / f_slow
        t_fast = np.arange(X_dX.shape[0]) / self.fs
        ifun = si_PchipInterpolator(t_slow, X, extrapolate=True)
        return ifun(t_fast)

Gnd

Bases: Converter

Specific converter method for GND type channels

Does not do any conversion.

Source code in turban/instruments/microrider/rsConversions.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
class Gnd(Converter):
    """Specific converter method for GND type channels

    Does not do any conversion.
    """

    def __init__(self, config: common.ChannelConfigABC):
        super().__init__(config)
        self.defaults = common.ChannelConfig(units="[ counts ]")

    def convert(
        self, v: np.typing.NDArray[np.float64] | np.typing.NDArray[np.int16]
    ) -> np.typing.NDArray[np.float64]:
        return v.astype(np.float64)

Incl

Bases: Converter

Specific converter method common to attitude type channels

Source code in turban/instruments/microrider/rsConversions.py
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
class Incl(Converter):
    """Specific converter method common to attitude type channels"""

    def __init__(self, config: common.ChannelConfigABC):
        super().__init__(config)
        self.adis = Adis()

    def convert(
        self, v: np.typing.NDArray[np.float64] | np.typing.NDArray[np.int16]
    ) -> np.typing.NDArray[np.float64]:
        coefs = [self.get_parameter("coef1"), self.get_parameter("coef0")]
        if v.dtype == ">i2":
            v = v.astype(np.int16)
        if not v.dtype == np.int16:
            raise ValueError("Unexpected datatype encountered.")
        v_raw = self.adis.convert(v.astype(np.int16))
        # old data and errors are all ignored. Should we set all those elements to nan?
        v_unit = np.polyval(coefs, v_raw).astype(np.float64)
        return v_unit

InclT

Bases: Incl

Specific converter method for attitude's temperature type channels

Source code in turban/instruments/microrider/rsConversions.py
262
263
264
265
266
267
268
269
class InclT(Incl):
    """Specific converter method for attitude's temperature type channels"""

    def __init__(self, config: common.ChannelConfigABC):
        super().__init__(config)
        self.defaults = common.ChannelConfigInclinometer(
            coef0=624.0, coef1=-0.47, units="[ °C ]"
        )

InclXY

Bases: Incl

Specific converter method for attitude type channels

Source code in turban/instruments/microrider/rsConversions.py
254
255
256
257
258
259
class InclXY(Incl):
    """Specific converter method for attitude type channels"""

    def __init__(self, config: common.ChannelConfigABC):
        super().__init__(config)
        self.defaults = common.ChannelConfigInclinometer(units="[ ° ]")

PassThrough

Bases: Converter

Pass through converter

Source code in turban/instruments/microrider/rsConversions.py
329
330
331
332
333
334
335
336
337
338
339
class PassThrough(Converter):
    """Pass through converter"""

    def __init__(self, config: common.ChannelConfigABC):
        super().__init__(config)
        self.defaults = common.ChannelConfig()

    def convert(
        self, v: np.typing.NDArray[np.int16] | np.typing.NDArray[np.float64]
    ) -> np.typing.NDArray[np.float64]:
        return v.astype(np.float64)

Piezo

Bases: Converter

Specific converter method for Piezo type channels

Source code in turban/instruments/microrider/rsConversions.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
class Piezo(Converter):
    """Specific converter method for Piezo type channels"""

    def __init__(self, config: common.ChannelConfigABC):
        super().__init__(config)
        self.defaults = common.ChannelConfigPiezo(a0=0.0, units="[ counts ]")

    def convert(
        self, v: np.typing.NDArray[np.float64] | np.typing.NDArray[np.int16]
    ) -> np.typing.NDArray[np.float64]:
        a0 = self.get_parameter("a0")
        v_unit = v.astype(np.float64) - a0
        return v_unit

Poly

Bases: Converter

Specific converter method for polynomial type channels

Source code in turban/instruments/microrider/rsConversions.py
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
class Poly(Converter):
    """Specific converter method for polynomial type channels"""

    def __init__(self, config: common.ChannelConfigABC):
        super().__init__(config)
        self.defaults = common.ChannelConfigPressure(units=" ")

    def convert(
        self, v: np.typing.NDArray[np.float64] | np.typing.NDArray[np.int16]
    ) -> np.typing.NDArray[np.float64]:
        polyVals: list[float | int] = []
        for i in range(9):
            p = self.get_optional_parameter(f"coef{i:d}")
            if p is None:
                break
            polyVals.insert(0, p)
        v_unit = np.polyval(polyVals, v).astype(np.float64)
        return v_unit

Shear

Bases: Converter

Specific converter method for shear type channels

Source code in turban/instruments/microrider/rsConversions.py
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
class Shear(Converter):
    """Specific converter method for shear type channels"""

    def __init__(self, config: common.ChannelConfigABC):
        super().__init__(config)
        self.defaults = common.ChannelConfigShear(
            adc_zero=0.0, sig_zero=0.0, units="[ s^{-1} ]"
        )

    def convert(
        self, v: np.typing.NDArray[np.float64] | np.typing.NDArray[np.int16]
    ) -> np.typing.NDArray[np.float64]:
        v = v.astype(np.float64)
        adc_zero = self.get_parameter("adc_zero")
        sig_zero = self.get_parameter("sig_zero")
        adc_fs = self.get_parameter("adc_fs")
        adc_bits = self.get_parameter("adc_bits")
        diff_gain = self.get_parameter("diff_gain")
        sens = self.get_parameter("sens")
        v_unit = (adc_fs / 2**adc_bits) * v + (adc_zero - sig_zero)
        v_unit /= diff_gain * sens * 2 * float(np.sqrt(2))
        return v_unit

Therm

Bases: Converter

Source code in turban/instruments/microrider/rsConversions.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
class Therm(Converter):
    def __init__(self, config: common.ChannelConfigABC):
        """Specific converter method for thermistor type channels"""

        super().__init__(config)
        self.defaults = common.ChannelConfigThermistor(units="[ °C ]")

    def convert(
        self, v: np.typing.NDArray[np.float64] | np.typing.NDArray[np.int16]
    ) -> np.typing.NDArray[np.float64]:
        a = self.get_parameter("a")
        b = self.get_parameter("b")
        adc_fs = self.get_parameter("adc_fs")
        adc_bits = self.get_parameter("adc_bits")
        g = self.get_parameter("g")
        e_b = self.get_parameter("e_b")
        t_0 = self.get_parameter("t_0")

        Z = ((v - a) / b) * (adc_fs / 2**adc_bits) * 2 / (g * e_b)
        # Avoid taking log of negative numbers in case of broken sensor:
        Z = Z.clip(-0.6, 0.6, out=Z)
        R = (1 - Z) / (1 + Z)
        log_R = np.log(R)
        beta = self.get_optional_parameter("beta") or self.get_optional_parameter(
            "beta_1"
        )
        if beta is None:
            logger.error("No beta or beta_1 for this thermistor")
            ValueError("No beta or beta_1 for this thermistor")
        else:
            physical = 1 / t_0 + (1 / beta) * log_R

        beta_2 = self.get_optional_parameter("beta_2")
        if not beta_2 is None:
            physical += (1 / beta_2) * log_R**2

        beta_3 = self.get_optional_parameter("beta_3")
        if not beta_3 is None:
            physical += (1 / beta_3) * log_R**3

        physical = 1 / physical - 273.15
        return physical

__init__(config)

Specific converter method for thermistor type channels

Source code in turban/instruments/microrider/rsConversions.py
128
129
130
131
132
def __init__(self, config: common.ChannelConfigABC):
    """Specific converter method for thermistor type channels"""

    super().__init__(config)
    self.defaults = common.ChannelConfigThermistor(units="[ °C ]")

Voltage

Bases: Converter

Specific converter method for voltage type channels

Source code in turban/instruments/microrider/rsConversions.py
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
class Voltage(Converter):
    """Specific converter method for voltage type channels"""

    def __init__(self, config: common.ChannelConfigABC):
        super().__init__(config)
        self.defaults = common.ChannelConfigVoltage(adc_zero=0.0, g=1.0, units="[ V ]")

    def convert(
        self, v: np.typing.NDArray[np.float64] | np.typing.NDArray[np.int16]
    ) -> np.typing.NDArray[np.float64]:
        adc_zero = self.get_parameter("adc_zero")
        gain = self.get_parameter("g")
        adc_fs = self.get_parameter("adc_fs")
        adc_bits = self.get_parameter("adc_bits")
        v_unit = (adc_zero + v * adc_fs / 2**adc_bits) / gain
        return v_unit

get_converter(channel_config)

Converter factory

Parameters

channel_config : ChannelConfig channel configuration dataclass

Returns

Converter object, relevant for the requested channel type.

Note : this code uses the match/case structure and requires therefore python 3.10+

Source code in turban/instruments/microrider/rsConversions.py
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
def get_converter(channel_config: common.ChannelConfigABC) -> type[Converter]:
    """Converter factory

    Parameters
    ----------
    channel_config : ChannelConfig
        channel configuration dataclass

    Returns
    -------
    Converter object, relevant for the requested channel type.

    Note : this code uses the match/case structure and requires therefore python 3.10+
    """
    converter: type[Converter]
    try:
        sensor_type = channel_config.type
    except KeyError:
        sensor_type = "none"
    match sensor_type:
        case "piezo":
            converter = Piezo
        case "gnd":
            converter = Gnd
        case "therm":
            converter = Therm
        case "shear":
            converter = Shear
        case "poly":
            converter = Poly
        case "voltage":
            converter = Voltage
        case "inclxy":
            converter = InclXY
        case "inclt":
            converter = InclT
        case "aem1g_a":
            converter = Aem1g_a
        case "aem1g_d":
            converter = Aem1g_d
        case "none":
            converter = PassThrough
        case _:
            converter = PassThrough

    return converter

MicroRider binary I/O (instruments/microrider/rsIO.py)

Channel

Bases: object

Channel object

Parameters

name : string name of channel config : ChannelConfig channel configuration object (from setupstr) deconvolved : bool (False) flag to indicate whether this is a deconvolved channel.

This class holds the data of each channel: when read : the raw data are in .data (._data is None) when converted: the converted data are in .data and the raw data in ._data.

The following methods are provided:

copy(): this returns a new channel, but keeps the same configuration, and sets the deconvolved flag. This method is used to create new channels for the high resolution parameters

correct_sign(): sign correction (u2 -> i2) if configuration dictates.

convert_to_units(): applies to associated conversion algortithm (determined from the config dictionary) to the raw data.

join(): joins even and odd channels. Warning not fully tested. This method can be called by through the + operator: ch = chE+chO

Source code in turban/instruments/microrider/rsIO.py
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
class Channel(object):
    """Channel object

    Parameters
    ----------
    name : string
        name of channel
    config : ChannelConfig
        channel configuration object (from setupstr)
    deconvolved : bool (False)
        flag to indicate whether this is a deconvolved channel.

    This class holds the data of each channel:
    when read     : the raw data are in .data (._data is None)
    when converted: the converted data are in .data and the raw data in ._data.

    The following methods are provided:

    copy(): this returns a new channel, but keeps the same configuration,
            and sets the deconvolved flag. This method is used to create
            new channels for the high resolution parameters

    correct_sign(): sign correction (u2 -> i2) if configuration dictates.

    convert_to_units(): applies to associated conversion algortithm (determined
           from the config dictionary) to the raw data.

    join(): joins even and odd channels. Warning not fully tested. This method
           can be called by through the + operator: ch = chE+chO


    """

    def __init__(self, channel_config: ChannelConfigABC, deconvolved: bool = False):
        self.name: str = channel_config.name
        self.config: ChannelConfigABC = channel_config
        self.data: np.typing.NDArray[np.float64] | np.typing.NDArray[np.int16] = (
            np.array([])
        )
        self._data: np.typing.NDArray[np.float64] | np.typing.NDArray[np.int16] = (
            np.array([])
        )
        self.converter = rsConversions.get_converter(channel_config)(channel_config)
        self.converted_into_units: bool = False
        self.deconvolved: bool = deconvolved

    def __add__(self, rhs: "Channel") -> "Channel":
        return self.join(self, rhs)

    def copy(self, new_name: str = "", deconvolved: bool = True) -> "Channel":
        """Copies configuration into a new channel with optionally a new name

        Parameters
        ----------
        new_name : string (None)
            gives this name to the new channel. If None, it keeps the original name.

        Returns
        -------
        Channel object
            Empty channel, with the configuration copied, and the deconvolved flag set.
        """
        name = new_name or self.name
        config = self.config.copy()
        return Channel(config, deconvolved=deconvolved)

    def correct_sign(self) -> None:
        """Corrects sign of data (if configuration dictates)

        Some channels record in unsigned integers. If required they need to be
        signed, where the msb is used as sign.

        """
        types_to_skip = "sbt sbc jac_c jac_t o2_43f".split()
        cfg = self.config
        if cfg.id == 255:
            return  # nothing to do for ch255
        if cfg.type in types_to_skip:
            return
        if cfg.sign == "unsigned":
            logger.info("Correcting sign")
            self.data = self.data.astype(np.dtype(f">u{HeaderEnum.WordSize}"))  # as unsigned
        else:
            idx = np.where(self.data >= 2**31)[0]
            if len(idx):
                self.data[idx] -= 2**32
                logger.info("Correcting sign of 32 bit channel.")
                logger.debug("Does this ever happen?")
                raise ValueError("FIX ME")

    def convert_to_units(self) -> None:
        """Converts channel into unit bearing data

        The attribute .data is copied to ._data (raw data) and
        converted using a conversion method associated with the
        type field of the configuration.

        Sets the converted_into_units flag.
        """
        if not self.converted_into_units:
            self._data = self.data
            self.data = self.converter(self._data)
            self.converted_into_units = True
        else:
            raise ValueError(
                "Refusing to convert to units for a parameter already converted."
            )

    def join(self, even_channel: "Channel", odd_channel: "Channel") -> "Channel":
        """Joins even and odd channel

        Parameters
        ---------
        even_channel : Channel
            Channel with the even signal (16 bit integer)
        odd_channel : Channel
            Channel with the odd signal (16 bit integer)

        Returns
        -------
        Channel
            Channel with combined channel (32 bit)

        Note: Not fully tested.
        """
        chE = even_channel.data.astype(
            np.dtype(f">u{HeaderEnum.WordSize}")
        )  # as unsigned.
        chO = odd_channel.data.astype(
            np.dtype(f">u{HeaderEnum.WordSize}")
        )  # as unsigned
        chEO = np.zeros(
            chE.shape, dtype=f">u{HeaderEnum.WordSize*2}"
        )  # unsigned, but 32 bit
        chEO[...] = (
            chO * 2**16 + chE
        )  # because chO is 16 bit we cannot shift 16 bits...
        even_channel.config.name = even_channel.name.replace("_E", "")
        channel = Channel(even_channel.config)
        channel.data = chEO
        return channel

convert_to_units()

Converts channel into unit bearing data

The attribute .data is copied to ._data (raw data) and converted using a conversion method associated with the type field of the configuration.

Sets the converted_into_units flag.

Source code in turban/instruments/microrider/rsIO.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
def convert_to_units(self) -> None:
    """Converts channel into unit bearing data

    The attribute .data is copied to ._data (raw data) and
    converted using a conversion method associated with the
    type field of the configuration.

    Sets the converted_into_units flag.
    """
    if not self.converted_into_units:
        self._data = self.data
        self.data = self.converter(self._data)
        self.converted_into_units = True
    else:
        raise ValueError(
            "Refusing to convert to units for a parameter already converted."
        )

copy(new_name='', deconvolved=True)

Copies configuration into a new channel with optionally a new name

Parameters

new_name : string (None) gives this name to the new channel. If None, it keeps the original name.

Returns

Channel object Empty channel, with the configuration copied, and the deconvolved flag set.

Source code in turban/instruments/microrider/rsIO.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def copy(self, new_name: str = "", deconvolved: bool = True) -> "Channel":
    """Copies configuration into a new channel with optionally a new name

    Parameters
    ----------
    new_name : string (None)
        gives this name to the new channel. If None, it keeps the original name.

    Returns
    -------
    Channel object
        Empty channel, with the configuration copied, and the deconvolved flag set.
    """
    name = new_name or self.name
    config = self.config.copy()
    return Channel(config, deconvolved=deconvolved)

correct_sign()

Corrects sign of data (if configuration dictates)

Some channels record in unsigned integers. If required they need to be signed, where the msb is used as sign.

Source code in turban/instruments/microrider/rsIO.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def correct_sign(self) -> None:
    """Corrects sign of data (if configuration dictates)

    Some channels record in unsigned integers. If required they need to be
    signed, where the msb is used as sign.

    """
    types_to_skip = "sbt sbc jac_c jac_t o2_43f".split()
    cfg = self.config
    if cfg.id == 255:
        return  # nothing to do for ch255
    if cfg.type in types_to_skip:
        return
    if cfg.sign == "unsigned":
        logger.info("Correcting sign")
        self.data = self.data.astype(np.dtype(f">u{HeaderEnum.WordSize}"))  # as unsigned
    else:
        idx = np.where(self.data >= 2**31)[0]
        if len(idx):
            self.data[idx] -= 2**32
            logger.info("Correcting sign of 32 bit channel.")
            logger.debug("Does this ever happen?")
            raise ValueError("FIX ME")

join(even_channel, odd_channel)

Joins even and odd channel

Parameters

even_channel : Channel Channel with the even signal (16 bit integer) odd_channel : Channel Channel with the odd signal (16 bit integer)

Returns

Channel Channel with combined channel (32 bit)

Note: Not fully tested.

Source code in turban/instruments/microrider/rsIO.py
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
def join(self, even_channel: "Channel", odd_channel: "Channel") -> "Channel":
    """Joins even and odd channel

    Parameters
    ---------
    even_channel : Channel
        Channel with the even signal (16 bit integer)
    odd_channel : Channel
        Channel with the odd signal (16 bit integer)

    Returns
    -------
    Channel
        Channel with combined channel (32 bit)

    Note: Not fully tested.
    """
    chE = even_channel.data.astype(
        np.dtype(f">u{HeaderEnum.WordSize}")
    )  # as unsigned.
    chO = odd_channel.data.astype(
        np.dtype(f">u{HeaderEnum.WordSize}")
    )  # as unsigned
    chEO = np.zeros(
        chE.shape, dtype=f">u{HeaderEnum.WordSize*2}"
    )  # unsigned, but 32 bit
    chEO[...] = (
        chO * 2**16 + chE
    )  # because chO is 16 bit we cannot shift 16 bits...
    even_channel.config.name = even_channel.name.replace("_E", "")
    channel = Channel(even_channel.config)
    channel.data = chEO
    return channel

ChannelMatrix

Bases: object

Data class to store the sensor matrix

Parameters

microrider_config : dict[str, str|int|float|list[int]] dictionary holding the MR configuration

Source code in turban/instruments/microrider/rsIO.py
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
class ChannelMatrix(object):
    """Data class to store the sensor matrix

    Parameters
    ----------
    microrider_config : dict[str, str|int|float|list[int]]
        dictionary holding the MR configuration

    """

    def __init__(self, microrider_config: rsConfig_parser.MicroRiderConfig) -> None:
        self.matrix = self._get_matrix(microrider_config)
        self.number_of_elements = np.prod(self.matrix.shape)
        self.channels = self._create_channels(microrider_config)

    def _get_matrix(
        self, microrider_config: rsConfig_parser.MicroRiderConfig
    ) -> np_typing.NDArray[np.int64]:
        matrix = []
        matrix_section = microrider_config.get_section("matrix")
        # we don't always have the number of rows given anymore.
        # Find out ourselves
        n = 0
        while True:
            k = f"row{n+1:02d}"  # Here they count from 1, for variation I suppose.
            if not k in matrix_section:
                break
            s = matrix_section[k]
            matrix.append(s)
            n += 1
        logger.info("Address matrix:")
        for row in matrix:
            _s = " ".join([f"{i:3d}" for i in list(row)])
            logger.info(f"| {_s} |")
        return_value = np.array(matrix)
        return np.array(matrix)

    def _create_channels(
        self, microrider_config: rsConfig_parser.MicroRiderConfig
    ) -> dict[str, Channel]:
        """ """
        channels: dict[str, Channel] = {}

        logger.info("Available channels:")
        for n in range(microrider_config.number_of_channels):
            section_name = f"channel{n:02d}"
            section_dict = microrider_config.get_section(section_name)
            channel_config = self._create_channel_config(section_dict)
            channel_name = channel_config.name
            channels[channel_name] = Channel(channel_config)
            logger.info(f"\t{n:2d}: {channel_name}")
        if np.any(self.matrix == 255):
            logger.debug("Created Channel 255...")
            channel_config = ChannelConfig(name="ch255", id=255, type="")
            channels["ch255"] = Channel(channel_config)
            n += 1
            logger.info(f"\t{n:2d}: ch255")
        return channels

    def _create_channel_config(self, section: dict[str, Any]) -> ChannelConfigABC:
        name = section["name"]
        _ChannelConfig = channel_config_factory(name)
        channel_config = _ChannelConfig(name)
        for k_any_case, v in section.items():
            k = k_any_case.lower()
            if k == "name":
                continue
            channel_config.update(k, v)
        return channel_config

HeaderParser

Source code in turban/instruments/microrider/rsIO.py
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
class HeaderParser:
    def parse(self, fd: io.BufferedReader) -> ByteHeader:
        """Parses header block of a data record. It is assumed that the block is HeaderEnum.HeaderSize large.

        Parameters
        ----------
        fd : io.BufferedReader
            file descriptor pointing to an open file

        Returns
        -------
        ByteHeader:
            a dataclass with (byte) header information
        """
        block = fd.read(HeaderEnum.HeaderSize)
        match self.__get(HeaderEnum.Endianness, block=block, fmt="<H"):
            case 0:
                get = partial(self.__get, block=block, fmt="<H")
            case 1 | 256:  # little endian
                get = partial(self.__get, block=block, fmt="<H")
            case 2 | 512:  # big endian
                get = partial(self.__get, block=block, fmt=">H")
            case _:
                raise ValueError(
                    "Failed to determine the endianness of this block of data."
                )

        # From the manual...
        byte_header = ByteHeader(
            file_number=get(0),
            record_number=get(1),
            record_number_serial_port=get(2),
            year=get(3),
            month=get(4),
            day=get(5),
            hour=get(6),
            minute=get(7),
            second=get(8),
            millisecond=get(9),
            header_version=float(get(10) >> 8) + float((get(10) & 0x0F)) / 1000,
            setupfile_size=get(11),
            product_ID=get(12),
            build_number=get(13),
            timezone_in_minutes=get(14),
            buffer_status=get(15),
            restarted=get(16),
            record_header_size=get(17) // HeaderEnum.WordSize,
            data_record_size=get(18) // HeaderEnum.WordSize,
            number_of_records_written=get(19),
            frequency_clock=float(get(20)) + float(get(21)) / 1e3,
            fast_cols=get(28),
            slow_cols=get(29),
            n_rows=get(30),
            data_size=(get(18) - get(17)) // HeaderEnum.WordSize,
        )
        return byte_header

    def __get(self, pos: int, block: bytes = b"", fmt: str = "<H") -> int:
        p = pos * HeaderEnum.WordSize
        v = int(struct.unpack(fmt, block[p : p + HeaderEnum.WordSize])[0])
        return v

    def check_for_bad_blocks(self, fp: io.BufferedReader) -> int:
        """Checks for bad blocks

        Method to check for bad blocks. The method reads all headers,
        and requires that the number of data blocks read matches the
        number of data blocks as written in the header. If these do
        not match, this points to an error in the data file.

        If something does not match, an exception is thrown.

        Parameters
        ----------
        fp: io.BufferedReader
            file descriptor pointing to an open file

        Returns
        -------
        int:
            number of data blocks read.

        """
        current_fp_location: int = fp.tell()
        fp.seek(0, 2)  # go to the end
        file_size: int = fp.tell()
        fp.seek(0, 0)  # rewind
        record_number: int = 0
        bytes_read: int = 0
        while True:
            # we are going to read 128 bytes. Make sure that that is possible
            if bytes_read == file_size:
                break  # we are done
            if file_size - bytes_read > HeaderEnum.HeaderSize:
                byte_header = self.parse(fp)
                bytes_read += HeaderEnum.HeaderSize
                if byte_header.record_number == record_number:
                    if byte_header.record_number == 0:
                        s = byte_header.setupfile_size
                    else:
                        s = byte_header.data_size * HeaderEnum.WordSize
                    bytes_read += s
                    if bytes_read > file_size:
                        raise uriderException("File seems truncated.")
                    fp.seek(s, 1)
                    record_number += 1
                else:
                    raise uriderException("Missing record(s).")
            else:
                # we cannot read header file, although there are bytes to read.
                raise uriderException("File seems truncated.")
        fp.seek(current_fp_location, 0)  # go back to where we started.
        record_number -= (
            1  # We count the first record too, but this is not a data record.
        )
        return record_number

    def read_setupstring(self, fd: io.BufferedReader, byte_header: ByteHeader) -> str:
        """Reads the setupstring from the binary file

        Parameters
        ----------
        fd : file descriptor

        byte_header: ByteHeader
            ByteHeader dataclass

        Returns
        -------
        str
            setupstring.

        """
        fd_current = fd.tell()
        fd.seek(HeaderEnum.HeaderSize, 0)
        setupfile_size = byte_header.setupfile_size
        bytestr = fd.read(setupfile_size)
        s = bytestr.decode()
        fd.seek(fd_current, 0)
        return s

check_for_bad_blocks(fp)

Checks for bad blocks

Method to check for bad blocks. The method reads all headers, and requires that the number of data blocks read matches the number of data blocks as written in the header. If these do not match, this points to an error in the data file.

If something does not match, an exception is thrown.

Parameters

fp: io.BufferedReader file descriptor pointing to an open file

Returns

int: number of data blocks read.

Source code in turban/instruments/microrider/rsIO.py
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
def check_for_bad_blocks(self, fp: io.BufferedReader) -> int:
    """Checks for bad blocks

    Method to check for bad blocks. The method reads all headers,
    and requires that the number of data blocks read matches the
    number of data blocks as written in the header. If these do
    not match, this points to an error in the data file.

    If something does not match, an exception is thrown.

    Parameters
    ----------
    fp: io.BufferedReader
        file descriptor pointing to an open file

    Returns
    -------
    int:
        number of data blocks read.

    """
    current_fp_location: int = fp.tell()
    fp.seek(0, 2)  # go to the end
    file_size: int = fp.tell()
    fp.seek(0, 0)  # rewind
    record_number: int = 0
    bytes_read: int = 0
    while True:
        # we are going to read 128 bytes. Make sure that that is possible
        if bytes_read == file_size:
            break  # we are done
        if file_size - bytes_read > HeaderEnum.HeaderSize:
            byte_header = self.parse(fp)
            bytes_read += HeaderEnum.HeaderSize
            if byte_header.record_number == record_number:
                if byte_header.record_number == 0:
                    s = byte_header.setupfile_size
                else:
                    s = byte_header.data_size * HeaderEnum.WordSize
                bytes_read += s
                if bytes_read > file_size:
                    raise uriderException("File seems truncated.")
                fp.seek(s, 1)
                record_number += 1
            else:
                raise uriderException("Missing record(s).")
        else:
            # we cannot read header file, although there are bytes to read.
            raise uriderException("File seems truncated.")
    fp.seek(current_fp_location, 0)  # go back to where we started.
    record_number -= (
        1  # We count the first record too, but this is not a data record.
    )
    return record_number

parse(fd)

Parses header block of a data record. It is assumed that the block is HeaderEnum.HeaderSize large.

Parameters

fd : io.BufferedReader file descriptor pointing to an open file

Returns

ByteHeader: a dataclass with (byte) header information

Source code in turban/instruments/microrider/rsIO.py
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
def parse(self, fd: io.BufferedReader) -> ByteHeader:
    """Parses header block of a data record. It is assumed that the block is HeaderEnum.HeaderSize large.

    Parameters
    ----------
    fd : io.BufferedReader
        file descriptor pointing to an open file

    Returns
    -------
    ByteHeader:
        a dataclass with (byte) header information
    """
    block = fd.read(HeaderEnum.HeaderSize)
    match self.__get(HeaderEnum.Endianness, block=block, fmt="<H"):
        case 0:
            get = partial(self.__get, block=block, fmt="<H")
        case 1 | 256:  # little endian
            get = partial(self.__get, block=block, fmt="<H")
        case 2 | 512:  # big endian
            get = partial(self.__get, block=block, fmt=">H")
        case _:
            raise ValueError(
                "Failed to determine the endianness of this block of data."
            )

    # From the manual...
    byte_header = ByteHeader(
        file_number=get(0),
        record_number=get(1),
        record_number_serial_port=get(2),
        year=get(3),
        month=get(4),
        day=get(5),
        hour=get(6),
        minute=get(7),
        second=get(8),
        millisecond=get(9),
        header_version=float(get(10) >> 8) + float((get(10) & 0x0F)) / 1000,
        setupfile_size=get(11),
        product_ID=get(12),
        build_number=get(13),
        timezone_in_minutes=get(14),
        buffer_status=get(15),
        restarted=get(16),
        record_header_size=get(17) // HeaderEnum.WordSize,
        data_record_size=get(18) // HeaderEnum.WordSize,
        number_of_records_written=get(19),
        frequency_clock=float(get(20)) + float(get(21)) / 1e3,
        fast_cols=get(28),
        slow_cols=get(29),
        n_rows=get(30),
        data_size=(get(18) - get(17)) // HeaderEnum.WordSize,
    )
    return byte_header

read_setupstring(fd, byte_header)

Reads the setupstring from the binary file

Parameters

fd : file descriptor

ByteHeader

ByteHeader dataclass

Returns

str setupstring.

Source code in turban/instruments/microrider/rsIO.py
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
def read_setupstring(self, fd: io.BufferedReader, byte_header: ByteHeader) -> str:
    """Reads the setupstring from the binary file

    Parameters
    ----------
    fd : file descriptor

    byte_header: ByteHeader
        ByteHeader dataclass

    Returns
    -------
    str
        setupstring.

    """
    fd_current = fd.tell()
    fd.seek(HeaderEnum.HeaderSize, 0)
    setupfile_size = byte_header.setupfile_size
    bytestr = fd.read(setupfile_size)
    s = bytestr.decode()
    fd.seek(fd_current, 0)
    return s

MicroRiderData

Bases: object

Data class to hold all data collected by the RSI MicroRider.

parameters

string

full path to .P file

config: system configuration, extracted from the setupfile string.

Source code in turban/instruments/microrider/rsIO.py
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
class MicroRiderData(object):
    """Data class to hold all data collected by the RSI MicroRider.

    parameters
    ----------

    path : string
        full path to .P file

    config: system configuration, extracted from the setupfile string.

    """

    def __init__(self, path: str, config: rsConfig_parser.MicroRiderConfig) -> None:
        self.config: rsConfig_parser.MicroRiderConfig = config
        self.header: Header
        self.channel_matrix: ChannelMatrix = ChannelMatrix(config)
        self._regex_deconvolve_preemph_name = re.compile(r"([TP].?)_d([TP].?)")
        self._regex_deconvolve_name = re.compile(r"^(T[12]|P)$")
        self._path = path

    def __getattr__(self, name: str) -> Channel:
        try:
            return self.channel_matrix.channels[name]
        except KeyError:
            raise AttributeError(
                f"'{self.__class__.__name__}' object has no attribute '{name}'"
            )

    def keys(self) -> list[str]:
        return list(self.channel_matrix.channels.keys())

    def add_header_data(self, byte_header: ByteHeader, n_records: int) -> None:
        """Adds specific parameters from the byteheader to the data structure

        Parameters
        ----------
        byte_header : ByteHeader
            ByteHeader dataclass
        """
        n_cols = byte_header.fast_cols + byte_header.slow_cols
        matrix_count = (
            n_records * byte_header.data_size // (byte_header.n_rows * n_cols)
        )
        fs_fast = byte_header.frequency_clock / n_cols
        fs_slow = byte_header.frequency_clock / n_cols / byte_header.n_rows
        header_version = byte_header.header_version
        t_slow = np.arange(matrix_count) / fs_slow
        t_fast = np.arange(matrix_count * byte_header.n_rows) / fs_fast
        timestamp, datestring, timestring = self.get_file_timestamp(byte_header)

        self.header = Header(
            full_path=os.path.realpath(self._path),
            n_cols=n_cols,
            n_records=n_records,
            fs_fast=fs_fast,
            fs_slow=fs_slow,
            header_version=header_version,
            matrix_count=matrix_count,
            t_fast=t_fast,
            t_slow=t_slow,
            timestamp=timestamp,
            date=datestring,
            time=timestring,
        )

    def get_file_timestamp(self, byte_header: ByteHeader) -> tuple[float, str, str]:
        """Gets time stamp of the file

        Parameters
        ----------
        byte_header : ByteHeader
            ByteHeader dataclass with the header parameters

        Returns
        -------
        (float, str, str):
            timestamp, date string, time string

        The methods return time stamp (seconds since 1970), date string
        and time string

        """
        date = arrow.get(
            byte_header.year,
            byte_header.month,
            byte_header.day,
            byte_header.hour,
            byte_header.minute,
            byte_header.second,
            byte_header.millisecond,
        )
        timestamp = date.timestamp()
        datestr = date.strftime("%Y-%m-%d")
        timestr = date.strftime("%H-%M-%S")
        return timestamp, datestr, timestr

    def add_channel_data(self, data_snr: np_typing.NDArray[np.int16]) -> None:
        """Adds the channel data, from array with dtype i2/u2

        Parameters
        ----------

        data_snr : array
            array with bytes converted to signed and unsigned 16 bit integers.

        The method adds all the numeric data conveyed in this matrix to the specific
        channels. It uses the sensor matrix to determine which part goes to what
        channel.

        Note: this channel adds a keyword to the configuration: depending on the
              number of values per record, it adds whether this is a fast or
              slow senor if the frequency is 512 or 64, respectively.
        """
        n_matrix_rows, _ = self.channel_matrix.matrix.shape
        channel_positions = self.channel_matrix.matrix.flatten()
        for channel_name, channel in self.channel_matrix.channels.items():
            logger.debug(f"Channel name : {channel_name}")
            i_channel = channel.config.id
            idx = np.where(channel_positions == i_channel)[0]
            n_data_per_record = len(idx)
            if n_data_per_record == n_matrix_rows:
                channel.config.update("sample_rate", self.header.fs_fast)
            elif n_data_per_record == 1:
                channel.config.update("sample_rate", self.header.fs_slow)
            else:
                channel.config.update(
                    "sample_rate", 0
                )  # we don't need to know this for this channel anyway.
            # correct the sign of the data if necessary
            channel.data = data_snr[:, idx].flatten()
            channel.correct_sign()

    def combine_split_channels(self) -> None:
        """Combines even and odd channels into one

        All channels that are labelled *_E and *_O are combined into one, using 32 bit ints.

        Note: This method has not thorougly been tested because the test data don't have any
              split channels. It was tested modifying the data file, which has been reverted
              back again.
        """
        channels = self.channel_matrix.channels
        even_channels = [ch for ch in channels.keys() if ch.endswith("_E")]
        for ch_even in even_channels:
            ch_odd = ch_even.replace("_E", "_O")
            ch = ch_even.replace("_E", "")
            if not ch_odd in channels.keys():
                logger.warning(f"Found channel {ch_even}, but did not find {ch_odd}")
                continue  # No odd channel found.
            self.channel_matrix.channels[ch] = channels[ch_even] + channels[ch_odd]
            self.channel_matrix.channels.pop(ch_even)
            self.channel_matrix.channels.pop(ch_odd)

    def convert_channels(self) -> None:
        """Converts all channels

        Converts all channels to data with physical units. If a channel
        needs to be deconvolved first, this is also done (and leads to
        a new channel).
        """
        for n, ch in self.channel_matrix.channels.copy().items():
            regex_preemph = self._regex_deconvolve_preemph_name.match(n)
            regex = self._regex_deconvolve_name.match(n)
            if regex_preemph:
                # These channels need first to be deconvolved.
                logger.info(f"Deconvolving and converting {n}")
                self.deconvolve_and_convert_channel(ch)
            elif (
                not regex
            ):  # skipping T1 T2 and P, because they are interpolated from the deconvolved versions
                logger.info(f"Converting {n}")
                ch.convert_to_units()
            else:
                logger.debug(
                    f"Skipping {n} for now, because, they are interpolated from the deconvolved versions???"
                )

    def deconvolve_and_convert_channel(self, ch: Channel) -> None:
        """Treats channels that need deconvolving separately

        Note: this method creates a new channel, with the name
              based on the primary channel (e.g. T1) with "_hires" added.


        """
        channel_name = ch.name
        co_channel_name, _ = ch.name.split("_")
        co_channel = self.channel_matrix.channels[co_channel_name]
        # this channel should not be converted yet.
        if ch.converted_into_units:
            raise ValueError(
                "This preemph channel is converted to units, but should be deconvolved first."
            )
        if co_channel.converted_into_units:
            raise ValueError(
                "The channel (without preemphasis) is converted to units, but should be usedin the deconvolution first."
            )
        logger.debug(f"Trying to deconvolve {channel_name} ({co_channel_name})")
        if not np.isclose(ch.config.sample_rate, self.header.fs_fast):
            ch_fast = self.interpolate_onto_fast_channel(ch)
        else:
            ch_fast = ch
        if not np.isclose(co_channel.config.sample_rate, self.header.fs_fast):
            co_channel_fast = self.interpolate_onto_fast_channel(co_channel)
        else:
            co_channel_fast = co_channel
        X = co_channel_fast.data  # not converted yet, so OK.
        X_dX = ch_fast.data
        if hasattr(ch_fast.config, "diff_gain"):
            diff_gain = ch_fast.config.diff_gain  # <- is required to be present
        else:
            raise AttributeError(
                "diff_gain is not an attribute for this configuration object."
            )
        fs = self.header.fs_fast

        new_channel_name = f"{co_channel_name}_hires"
        new_channel = co_channel.copy(
            new_channel_name
        )  # copy config from T1, name it T1_res
        new_channel.data = rsConversions.Deconvolve(
            X_dX.astype(np.float64), X.astype(np.float64), fs, diff_gain
        ).X_hires
        new_channel.convert_to_units()
        self.channel_matrix.channels[new_channel_name] = new_channel
        # interpolate co channel.
        logger.info(
            f"Converting {co_channel_name} by means of interpolation of unit-bearing hires channel"
        )
        self.interpolate_co_channel(co_channel, new_channel)

    def interpolate_onto_fast_channel(self, ch: Channel) -> Channel:
        logger.debug(f"Interpolating {ch.name} to a fast channel...")
        ch_hires = ch.copy("dummy")
        ch_hires.config.sample_rate = self.header.fs_fast
        ifun = si_interp1d(
            self.header.t_slow, ch.data, kind="cubic", fill_value="extrapolate"
        )
        ch_hires.data = ifun(self.header.t_fast).astype(np.float64)
        return ch_hires

    def interpolate_co_channel(
        self, co_channel: Channel, hires_channel: Channel
    ) -> None:
        if np.isclose(co_channel.config.sample_rate, self.header.fs_slow):
            tm_i = self.header.t_slow
            ifun = si_interp1d(
                self.header.t_fast,
                hires_channel.data,
                kind="cubic",
                fill_value="extrapolate",
            )
            co_channel.data = ifun(tm_i).astype(np.float64)
            co_channel.converted_into_units = True
        else:
            raise ValueError(
                "Trying to interpolate to hires from a hires vector. Fix me."
            )

add_channel_data(data_snr)

Adds the channel data, from array with dtype i2/u2

Parameters
array

array with bytes converted to signed and unsigned 16 bit integers.

The method adds all the numeric data conveyed in this matrix to the specific channels. It uses the sensor matrix to determine which part goes to what channel.

this channel adds a keyword to the configuration: depending on the

number of values per record, it adds whether this is a fast or slow senor if the frequency is 512 or 64, respectively.

Source code in turban/instruments/microrider/rsIO.py
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
def add_channel_data(self, data_snr: np_typing.NDArray[np.int16]) -> None:
    """Adds the channel data, from array with dtype i2/u2

    Parameters
    ----------

    data_snr : array
        array with bytes converted to signed and unsigned 16 bit integers.

    The method adds all the numeric data conveyed in this matrix to the specific
    channels. It uses the sensor matrix to determine which part goes to what
    channel.

    Note: this channel adds a keyword to the configuration: depending on the
          number of values per record, it adds whether this is a fast or
          slow senor if the frequency is 512 or 64, respectively.
    """
    n_matrix_rows, _ = self.channel_matrix.matrix.shape
    channel_positions = self.channel_matrix.matrix.flatten()
    for channel_name, channel in self.channel_matrix.channels.items():
        logger.debug(f"Channel name : {channel_name}")
        i_channel = channel.config.id
        idx = np.where(channel_positions == i_channel)[0]
        n_data_per_record = len(idx)
        if n_data_per_record == n_matrix_rows:
            channel.config.update("sample_rate", self.header.fs_fast)
        elif n_data_per_record == 1:
            channel.config.update("sample_rate", self.header.fs_slow)
        else:
            channel.config.update(
                "sample_rate", 0
            )  # we don't need to know this for this channel anyway.
        # correct the sign of the data if necessary
        channel.data = data_snr[:, idx].flatten()
        channel.correct_sign()

add_header_data(byte_header, n_records)

Adds specific parameters from the byteheader to the data structure

Parameters

byte_header : ByteHeader ByteHeader dataclass

Source code in turban/instruments/microrider/rsIO.py
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
def add_header_data(self, byte_header: ByteHeader, n_records: int) -> None:
    """Adds specific parameters from the byteheader to the data structure

    Parameters
    ----------
    byte_header : ByteHeader
        ByteHeader dataclass
    """
    n_cols = byte_header.fast_cols + byte_header.slow_cols
    matrix_count = (
        n_records * byte_header.data_size // (byte_header.n_rows * n_cols)
    )
    fs_fast = byte_header.frequency_clock / n_cols
    fs_slow = byte_header.frequency_clock / n_cols / byte_header.n_rows
    header_version = byte_header.header_version
    t_slow = np.arange(matrix_count) / fs_slow
    t_fast = np.arange(matrix_count * byte_header.n_rows) / fs_fast
    timestamp, datestring, timestring = self.get_file_timestamp(byte_header)

    self.header = Header(
        full_path=os.path.realpath(self._path),
        n_cols=n_cols,
        n_records=n_records,
        fs_fast=fs_fast,
        fs_slow=fs_slow,
        header_version=header_version,
        matrix_count=matrix_count,
        t_fast=t_fast,
        t_slow=t_slow,
        timestamp=timestamp,
        date=datestring,
        time=timestring,
    )

combine_split_channels()

Combines even and odd channels into one

All channels that are labelled _E and _O are combined into one, using 32 bit ints.

This method has not thorougly been tested because the test data don't have any

split channels. It was tested modifying the data file, which has been reverted back again.

Source code in turban/instruments/microrider/rsIO.py
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
def combine_split_channels(self) -> None:
    """Combines even and odd channels into one

    All channels that are labelled *_E and *_O are combined into one, using 32 bit ints.

    Note: This method has not thorougly been tested because the test data don't have any
          split channels. It was tested modifying the data file, which has been reverted
          back again.
    """
    channels = self.channel_matrix.channels
    even_channels = [ch for ch in channels.keys() if ch.endswith("_E")]
    for ch_even in even_channels:
        ch_odd = ch_even.replace("_E", "_O")
        ch = ch_even.replace("_E", "")
        if not ch_odd in channels.keys():
            logger.warning(f"Found channel {ch_even}, but did not find {ch_odd}")
            continue  # No odd channel found.
        self.channel_matrix.channels[ch] = channels[ch_even] + channels[ch_odd]
        self.channel_matrix.channels.pop(ch_even)
        self.channel_matrix.channels.pop(ch_odd)

convert_channels()

Converts all channels

Converts all channels to data with physical units. If a channel needs to be deconvolved first, this is also done (and leads to a new channel).

Source code in turban/instruments/microrider/rsIO.py
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
def convert_channels(self) -> None:
    """Converts all channels

    Converts all channels to data with physical units. If a channel
    needs to be deconvolved first, this is also done (and leads to
    a new channel).
    """
    for n, ch in self.channel_matrix.channels.copy().items():
        regex_preemph = self._regex_deconvolve_preemph_name.match(n)
        regex = self._regex_deconvolve_name.match(n)
        if regex_preemph:
            # These channels need first to be deconvolved.
            logger.info(f"Deconvolving and converting {n}")
            self.deconvolve_and_convert_channel(ch)
        elif (
            not regex
        ):  # skipping T1 T2 and P, because they are interpolated from the deconvolved versions
            logger.info(f"Converting {n}")
            ch.convert_to_units()
        else:
            logger.debug(
                f"Skipping {n} for now, because, they are interpolated from the deconvolved versions???"
            )

deconvolve_and_convert_channel(ch)

Treats channels that need deconvolving separately

this method creates a new channel, with the name

based on the primary channel (e.g. T1) with "_hires" added.

Source code in turban/instruments/microrider/rsIO.py
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
def deconvolve_and_convert_channel(self, ch: Channel) -> None:
    """Treats channels that need deconvolving separately

    Note: this method creates a new channel, with the name
          based on the primary channel (e.g. T1) with "_hires" added.


    """
    channel_name = ch.name
    co_channel_name, _ = ch.name.split("_")
    co_channel = self.channel_matrix.channels[co_channel_name]
    # this channel should not be converted yet.
    if ch.converted_into_units:
        raise ValueError(
            "This preemph channel is converted to units, but should be deconvolved first."
        )
    if co_channel.converted_into_units:
        raise ValueError(
            "The channel (without preemphasis) is converted to units, but should be usedin the deconvolution first."
        )
    logger.debug(f"Trying to deconvolve {channel_name} ({co_channel_name})")
    if not np.isclose(ch.config.sample_rate, self.header.fs_fast):
        ch_fast = self.interpolate_onto_fast_channel(ch)
    else:
        ch_fast = ch
    if not np.isclose(co_channel.config.sample_rate, self.header.fs_fast):
        co_channel_fast = self.interpolate_onto_fast_channel(co_channel)
    else:
        co_channel_fast = co_channel
    X = co_channel_fast.data  # not converted yet, so OK.
    X_dX = ch_fast.data
    if hasattr(ch_fast.config, "diff_gain"):
        diff_gain = ch_fast.config.diff_gain  # <- is required to be present
    else:
        raise AttributeError(
            "diff_gain is not an attribute for this configuration object."
        )
    fs = self.header.fs_fast

    new_channel_name = f"{co_channel_name}_hires"
    new_channel = co_channel.copy(
        new_channel_name
    )  # copy config from T1, name it T1_res
    new_channel.data = rsConversions.Deconvolve(
        X_dX.astype(np.float64), X.astype(np.float64), fs, diff_gain
    ).X_hires
    new_channel.convert_to_units()
    self.channel_matrix.channels[new_channel_name] = new_channel
    # interpolate co channel.
    logger.info(
        f"Converting {co_channel_name} by means of interpolation of unit-bearing hires channel"
    )
    self.interpolate_co_channel(co_channel, new_channel)

get_file_timestamp(byte_header)

Gets time stamp of the file

Parameters

byte_header : ByteHeader ByteHeader dataclass with the header parameters

Returns

(float, str, str): timestamp, date string, time string

The methods return time stamp (seconds since 1970), date string and time string

Source code in turban/instruments/microrider/rsIO.py
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
def get_file_timestamp(self, byte_header: ByteHeader) -> tuple[float, str, str]:
    """Gets time stamp of the file

    Parameters
    ----------
    byte_header : ByteHeader
        ByteHeader dataclass with the header parameters

    Returns
    -------
    (float, str, str):
        timestamp, date string, time string

    The methods return time stamp (seconds since 1970), date string
    and time string

    """
    date = arrow.get(
        byte_header.year,
        byte_header.month,
        byte_header.day,
        byte_header.hour,
        byte_header.minute,
        byte_header.second,
        byte_header.millisecond,
    )
    timestamp = date.timestamp()
    datestr = date.strftime("%Y-%m-%d")
    timestr = date.strftime("%H-%M-%S")
    return timestamp, datestr, timestr

read_p_file(filename, setupstring_filename='')

Function to read a single .p file.

Parameters

filename : str Name of .p file to read setupstring_filename : str (Optional : "") Name of external setup file to be used.

Returns

MicroRiderData object Raw data, converted into physical units.

Source code in turban/instruments/microrider/rsIO.py
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
def read_p_file(filename: str, setupstring_filename: str = "") -> MicroRiderData:
    """Function to read a single .p file.

    Parameters
    ----------
    filename : str
         Name of .p file to read
    setupstring_filename : str (Optional : "")
         Name of external setup file to be used.

    Returns
    -------
    MicroRiderData object
        Raw data, converted into physical units.
    """
    header_parser = HeaderParser()
    microrider_config = rsConfig_parser.MicroRiderConfig()

    full_path = os.path.realpath(filename)

    if setupstring_filename:
        with open(setupstring_filename, "r") as fd:
            setupstring = fd.read()
    else:
        setupstring = ""

    with open(filename, "rb") as fd:
        header_parser = HeaderParser()
        header_data = header_parser.parse(fd)
        n_records = header_parser.check_for_bad_blocks(fd)
        if not setupstring:  # not set by external file
            # read embedded setupstring (leaves the fp in the correct place)
            setupstring = header_parser.read_setupstring(fd, header_data)
        microrider_config.parse(setupstring)
        data = MicroRiderData(full_path, microrider_config)
        data.add_header_data(header_data, n_records)
        fd.seek(HeaderEnum.HeaderSize + header_data.setupfile_size, 0)

        # Data to be interpreted as signed integers, header data as unsigned integers.
        dt_data = np.dtype(f">i{HeaderEnum.WordSize}")
        dt_hdr = np.dtype(f">u{HeaderEnum.WordSize}")

        # read the data and cast it in a numpy array
        bytes_data = fd.read()
        bindata = np.frombuffer(bytes_data, dtype=dt_data).reshape(
            -1, header_data.data_record_size
        )
        # Extract header and data
    data_hdr = bindata[:, : header_data.record_header_size].astype(
        dt_hdr
    )  # recast as unsigned
    n = data.channel_matrix.number_of_elements
    # Make sure we end up with int16, as we may get '>i2'
    data_snr = (
        bindata[:, header_data.record_header_size :].reshape(-1, n).astype(np.int16)
    )
    data.add_channel_data(data_snr)
    # data.combine_split_channels() # <- we don't have split channels. Cannot test.
    data.convert_channels()
    return data

Data Processing

Generic processing API (process/generic/api.py)

Defines high-level API to interact with TURBAN toolbox

AuxiliaryData dataclass

Bases: TimeseriesLevel

Mix-in class to provide support for non-essential auxiliary data

Source code in turban/process/generic/api.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
@dataclass(kw_only=True)
class AuxiliaryData(TimeseriesLevel):
    """Mix-in class to provide support for non-essential auxiliary data"""

    # Here auxiliary data are stored along with aggregation instructions
    _aux_data: AuxDataTypehintLevel12 | AuxDataTypehintLevel34 | None = None

    def __post_init__(self):
        """Initialise ``_aux_data`` to an empty dict if it was not provided."""
        if self._aux_data is None:
            self._aux_data = {}

    def arrays_as_xr_dicts(self):
        """Separate array fields including auxiliary data into data-variable and coordinate dicts.

        Returns
        -------
        data_vars : dict
            Fields (including auxiliary data) not listed in ``_coords``.
        coords : dict
            Fields listed in ``_coords``.
        """
        data_vars, coords = super().arrays_as_xr_dicts()
        if self._level <= 2:
            data = cast(AuxDataTypehintLevel12, self._aux_data)
            data_vars.update(
                {varname: (dims, arr) for varname, (dims, arr, _) in data.items()}
            )
        else:
            data = cast(AuxDataTypehintLevel34, self._aux_data)
            data_vars.update(
                {varname: (dims, arr) for varname, (dims, arr) in data.items()}
            )
        return data_vars, coords

    @classmethod
    def from_xarray(cls, ds: xr.Dataset):
        """Instantiate from an xarray Dataset, loading auxiliary variables.

        Parameters
        ----------
        ds : xr.Dataset
            Dataset; variables not recognised as core fields are added as auxiliary data.

        Returns
        -------
        AuxiliaryData
            New instance with core and auxiliary data populated.
        """
        new = super().from_xarray(ds)
        for name in ds:
            if not hasattr(new, name):
                if len(ds[name].shape) == 1:
                    new.add_aux_data(ds[name].values, name=name)
                else:
                    logger.warning(
                        (
                            f"Skipping potential auxiliary variable `{name}` defined on "
                            f"dataset as it has {len(ds[name].shape)} dimensions"
                        )
                    )
        return new

    def _variable_present(self, name):
        """Returns True if variable called `name` already present somewhere"""
        if name in self._aux_data:
            logger.warning(f"Variable`{name}` already exists in aux data, skipping")
        elif name in get_type_hints_recursive(type(self)):
            logger.warning(
                f"Variable `{name}` already defined on object itself, skipping"
            )
        else:
            return False

        return True

    def add_aux_data(
        self,
        data: (
            Num[ndarray, "time"]
            | AuxDataTypehintLevel12
            | AuxDataTypehintLevel34
            | None
        ),
        name: str | None = None,
        agg_method: str | None = "mean",
        name_out: str | None = None,
    ):
        """Adds auxiliary data to any level.

        If data is a 1D numpy array, uses simplified API.
        Must then supply `name`, `agg_method` and optionally `name_out`.

        Otherwise, use the full API.
        """
        if data is None:
            self._aux_data = {}

        elif isinstance(data, ndarray):
            # Simplified API
            if agg_method is None:
                raise ValueError("`agg_method` must not be None")
            data = cast(Num[ndarray, "time"], data)
            # so linters understand we have a dict after __post_init__
            if not self._variable_present(name):
                if self._level <= 2:
                    self._aux_data = cast(AuxDataTypehintLevel12, self._aux_data)
                    agg_method = cast(str, agg_method)  # agg_method must be str
                    # Construct the full aggregation instruction
                    self._aux_data[name] = (["time"], data, {agg_method: name_out})
                else:
                    self._aux_data = cast(AuxDataTypehintLevel34, self._aux_data)
                    self._aux_data[name] = (["time"], data)

        else:
            # Full API
            data = cast(AuxDataTypehintLevel12 | AuxDataTypehintLevel34, data)
            if self._level <= 2:
                self._aux_data = cast(AuxDataTypehintLevel12, self._aux_data)
            else:
                self._aux_data = cast(AuxDataTypehintLevel34, self._aux_data)
            # clean data for variables that are not present
            data = {
                varname: v
                for varname, v in data.items()
                if not self._variable_present(varname)
            }
            self._aux_data = data

__post_init__()

Initialise _aux_data to an empty dict if it was not provided.

Source code in turban/process/generic/api.py
157
158
159
160
def __post_init__(self):
    """Initialise ``_aux_data`` to an empty dict if it was not provided."""
    if self._aux_data is None:
        self._aux_data = {}

add_aux_data(data, name=None, agg_method='mean', name_out=None)

Adds auxiliary data to any level.

If data is a 1D numpy array, uses simplified API. Must then supply name, agg_method and optionally name_out.

Otherwise, use the full API.

Source code in turban/process/generic/api.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
def add_aux_data(
    self,
    data: (
        Num[ndarray, "time"]
        | AuxDataTypehintLevel12
        | AuxDataTypehintLevel34
        | None
    ),
    name: str | None = None,
    agg_method: str | None = "mean",
    name_out: str | None = None,
):
    """Adds auxiliary data to any level.

    If data is a 1D numpy array, uses simplified API.
    Must then supply `name`, `agg_method` and optionally `name_out`.

    Otherwise, use the full API.
    """
    if data is None:
        self._aux_data = {}

    elif isinstance(data, ndarray):
        # Simplified API
        if agg_method is None:
            raise ValueError("`agg_method` must not be None")
        data = cast(Num[ndarray, "time"], data)
        # so linters understand we have a dict after __post_init__
        if not self._variable_present(name):
            if self._level <= 2:
                self._aux_data = cast(AuxDataTypehintLevel12, self._aux_data)
                agg_method = cast(str, agg_method)  # agg_method must be str
                # Construct the full aggregation instruction
                self._aux_data[name] = (["time"], data, {agg_method: name_out})
            else:
                self._aux_data = cast(AuxDataTypehintLevel34, self._aux_data)
                self._aux_data[name] = (["time"], data)

    else:
        # Full API
        data = cast(AuxDataTypehintLevel12 | AuxDataTypehintLevel34, data)
        if self._level <= 2:
            self._aux_data = cast(AuxDataTypehintLevel12, self._aux_data)
        else:
            self._aux_data = cast(AuxDataTypehintLevel34, self._aux_data)
        # clean data for variables that are not present
        data = {
            varname: v
            for varname, v in data.items()
            if not self._variable_present(varname)
        }
        self._aux_data = data

arrays_as_xr_dicts()

Separate array fields including auxiliary data into data-variable and coordinate dicts.

Returns

data_vars : dict Fields (including auxiliary data) not listed in _coords. coords : dict Fields listed in _coords.

Source code in turban/process/generic/api.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
def arrays_as_xr_dicts(self):
    """Separate array fields including auxiliary data into data-variable and coordinate dicts.

    Returns
    -------
    data_vars : dict
        Fields (including auxiliary data) not listed in ``_coords``.
    coords : dict
        Fields listed in ``_coords``.
    """
    data_vars, coords = super().arrays_as_xr_dicts()
    if self._level <= 2:
        data = cast(AuxDataTypehintLevel12, self._aux_data)
        data_vars.update(
            {varname: (dims, arr) for varname, (dims, arr, _) in data.items()}
        )
    else:
        data = cast(AuxDataTypehintLevel34, self._aux_data)
        data_vars.update(
            {varname: (dims, arr) for varname, (dims, arr) in data.items()}
        )
    return data_vars, coords

from_xarray(ds) classmethod

Instantiate from an xarray Dataset, loading auxiliary variables.

Parameters

ds : xr.Dataset Dataset; variables not recognised as core fields are added as auxiliary data.

Returns

AuxiliaryData New instance with core and auxiliary data populated.

Source code in turban/process/generic/api.py
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
@classmethod
def from_xarray(cls, ds: xr.Dataset):
    """Instantiate from an xarray Dataset, loading auxiliary variables.

    Parameters
    ----------
    ds : xr.Dataset
        Dataset; variables not recognised as core fields are added as auxiliary data.

    Returns
    -------
    AuxiliaryData
        New instance with core and auxiliary data populated.
    """
    new = super().from_xarray(ds)
    for name in ds:
        if not hasattr(new, name):
            if len(ds[name].shape) == 1:
                new.add_aux_data(ds[name].values, name=name)
            else:
                logger.warning(
                    (
                        f"Skipping potential auxiliary variable `{name}` defined on "
                        f"dataset as it has {len(ds[name].shape)} dimensions"
                    )
                )
    return new

HasLevelBelow dataclass

Bases: TimeseriesLevel

Source code in turban/process/generic/api.py
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
@dataclass(kw_only=True)
class HasLevelBelow(TimeseriesLevel):
    level_below: TimeseriesLevel | None = None

    @classmethod
    @abstractmethod
    def _from_level_below_kwarg(cls, data: TimeseriesLevel | None) -> dict:
        """Provides dictionary `kwarg` that can be passed into class constructor as cls(**kwarg)."""
        return {}

    @classmethod
    def from_level_below(cls, data: TimeseriesLevel | None):
        """Construct this level from the level-below data object.

        Parameters
        ----------
        data : TimeseriesLevel or None
            Data from the level below.

        Returns
        -------
        HasLevelBelow
            New instance initialised from ``data``.
        """
        return cls(**cls._from_level_below_kwarg(data))

    @property
    def cfg(self) -> SegmentConfig:
        if self.level_below is not None:
            return self.level_below.cfg
        elif hasattr(self, "_cfg"):
            return self._cfg
        else:
            raise ValueError

    @cfg.setter
    def cfg(self, value):
        if self.level_below is not None:
            self.level_below.cfg = value
        else:
            self._cfg = value

from_level_below(data) classmethod

Construct this level from the level-below data object.

Parameters

data : TimeseriesLevel or None Data from the level below.

Returns

HasLevelBelow New instance initialised from data.

Source code in turban/process/generic/api.py
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
@classmethod
def from_level_below(cls, data: TimeseriesLevel | None):
    """Construct this level from the level-below data object.

    Parameters
    ----------
    data : TimeseriesLevel or None
        Data from the level below.

    Returns
    -------
    HasLevelBelow
        New instance initialised from ``data``.
    """
    return cls(**cls._from_level_below_kwarg(data))

Level2 dataclass

Bases: HasLevelBelow, AuxiliaryData

Source code in turban/process/generic/api.py
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
@dataclass(kw_only=True)
class Level2(HasLevelBelow, AuxiliaryData):
    senspeed: Float[ndarray, "time"]
    section_number: Int[ndarray, "time"]

    _level: ClassVar[int] = 2

    @classmethod
    def _from_level_below_kwarg(cls, data: Level1) -> dict:
        """Build constructor kwargs for Level2 from Level1 data.

        Parameters
        ----------
        data : Level1
            Level 1 data object.

        Returns
        -------
        dict
            Keyword arguments to pass to the Level2 constructor.
        """
        return dict(
            time=data.time,
            senspeed=data.senspeed,
            section_number=data.section_number,
            _aux_data=data._aux_data,
        )

Level3 dataclass

Bases: HasLevelBelow, AuxiliaryData

Source code in turban/process/generic/api.py
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
@dataclass(kw_only=True)
class Level3(HasLevelBelow, AuxiliaryData):
    waveno: Float[ndarray, "time waveno"]
    freq: Float[ndarray, "waveno"]
    senspeed: Float[ndarray, "time"]
    section_number: Int[ndarray, "time"]

    _coords = ["time", "freq"]

    _level: ClassVar[int] = 3

    @classmethod
    def _from_level_below_kwarg(cls, data: Level2) -> dict:
        """Build constructor kwargs for Level3 from Level2 data, aggregating aux data.

        Parameters
        ----------
        data : Level2
            Level 2 data object.

        Returns
        -------
        dict
            Keyword arguments to pass to the Level3 constructor.
        """
        return dict(
            time=data.time,
            _aux_data=cls.agg(
                data=cast(AuxDataTypehintLevel12, data._aux_data),
                data_len=len(data.time),
                chunk_length=data.cfg.chunk_length,
                chunk_overlap=data.cfg.chunk_overlap,
                section_number=data.section_number,
            ),
        )

    @classmethod
    def agg(
        cls,
        data: AuxDataTypehintLevel12,
        data_len: int,
        chunk_length: int,
        chunk_overlap: int,
        section_number: Int[ndarray, "time"],
    ) -> AuxDataTypehintLevel34:
        """Aggregates data from Level2. These are stored in  in the form:
        {variable_name_fast: ([dims], ndarray, {agg_method: variable_new_slow}])}
        """
        slow = {}

        for varname, (dims, arr, rename_dict) in data.items():
            for agg_method, varname_new in rename_dict.items():
                if varname_new is None:
                    varname_new = f"{varname}_{agg_method}"
                slow[varname_new] = (
                    dims,
                    agg_fast_to_slow(
                        arr,
                        section_number_or_data_len=section_number,
                        chunk_length=chunk_length,
                        chunk_overlap=chunk_overlap,
                        agg_method=agg_method,
                    ),
                )
        return slow

agg(data, data_len, chunk_length, chunk_overlap, section_number) classmethod

Aggregates data from Level2. These are stored in in the form: {variable_name_fast: ([dims], ndarray, {agg_method: variable_new_slow}])}

Source code in turban/process/generic/api.py
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
@classmethod
def agg(
    cls,
    data: AuxDataTypehintLevel12,
    data_len: int,
    chunk_length: int,
    chunk_overlap: int,
    section_number: Int[ndarray, "time"],
) -> AuxDataTypehintLevel34:
    """Aggregates data from Level2. These are stored in  in the form:
    {variable_name_fast: ([dims], ndarray, {agg_method: variable_new_slow}])}
    """
    slow = {}

    for varname, (dims, arr, rename_dict) in data.items():
        for agg_method, varname_new in rename_dict.items():
            if varname_new is None:
                varname_new = f"{varname}_{agg_method}"
            slow[varname_new] = (
                dims,
                agg_fast_to_slow(
                    arr,
                    section_number_or_data_len=section_number,
                    chunk_length=chunk_length,
                    chunk_overlap=chunk_overlap,
                    agg_method=agg_method,
                ),
            )
    return slow

Level4 dataclass

Bases: HasLevelBelow, AuxiliaryData

Source code in turban/process/generic/api.py
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
@dataclass(kw_only=True)
class Level4(HasLevelBelow, AuxiliaryData):
    section_number: Int[ndarray, "time"]

    _level: ClassVar[int] = 4

    @classmethod
    def _from_level_below_kwarg(cls, data: Level3) -> dict:
        """Build constructor kwargs for Level4 from Level3 data.

        Parameters
        ----------
        data : Level3
            Level 3 data object.

        Returns
        -------
        dict
            Keyword arguments to pass to the Level4 constructor.
        """
        return dict(
            time=data.time,
            section_number=data.section_number,
            _aux_data=data._aux_data,
        )

Processing

Bases: ABC

Propagates a given start level up to level 4.

Source code in turban/process/generic/api.py
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
class Processing(ABC):
    """Propagates a given start level up to level 4."""

    @property
    @abstractmethod
    def _level_mapping(self) -> dict:
        """Mapping from level number to the corresponding level class.

        Returns
        -------
        dict
            Dict of ``{int: TimeseriesLevel subclass}`` for levels 1–4.
        """
        # Slightly clumsy way of requiring a class attribute called _level_mapping
        return {1: Level1, 2: Level2, 3: Level3, 4: Level4}

    def __init__(
        self,
        data: TimeseriesLevel,
    ):
        """Propagate ``data`` up to level 4 using the level mapping.

        Parameters
        ----------
        data : TimeseriesLevel
            Starting data object at any level.
        """
        for l in range(data._level + 1, 5):
            data = self._level_mapping[l].from_level_below(data)
        self.data = data

    @property
    def level1(self):
        """Level 1 data, or None if not available."""
        if self.level2 is None:
            return None
        else:
            return self.level2.level_below

    @property
    def level2(self):
        """Level 2 data, or None if not available."""
        if self.level3 is None:
            return None
        else:
            return self.level3.level_below

    @property
    def level3(self):
        """Level 3 data, or None if not available."""
        if self.level4 is None:
            return None
        else:
            return self.level4.level_below

    @property
    def level4(self):
        """Level 4 data (the topmost processed level)."""
        return self.data

    @property
    def cfg(self):
        """Segment configuration from the top-level data object."""
        return self.level4.cfg

    @property
    def data_len_fast(self):
        """Length of the fast time vector (levels 1 and 2)"""
        if self.level2 is None:
            return None
        else:
            return self.level2.time.shape[-1]

    def to_xarray(self):
        """Export"""
        out_data = {}
        for data in [self.level1, self.level2, self.level3, self.level4]:
            if data is not None:
                out = data.to_xarray()
                out_data[f"level{data._level}"] = xr.DataTree(out)

        data_tree = xr.DataTree(children=out_data)
        return data_tree

    @classmethod
    def from_xarray(cls, data_tree: xr.DataTree):
        """Instantiate pipeline from lowest available level"""
        for level, class_ in cls._level_mapping.items():
            level_str = f"level{level:d}"
            if level_str in data_tree:
                logger.debug(f"Start processing from level {level}")
                data = class_.from_xarray(data_tree[level_str].to_dataset())
                break
        else:
            raise ValueError(f"Could not find any data.")

        return cls(data)

cfg property

Segment configuration from the top-level data object.

data_len_fast property

Length of the fast time vector (levels 1 and 2)

level1 property

Level 1 data, or None if not available.

level2 property

Level 2 data, or None if not available.

level3 property

Level 3 data, or None if not available.

level4 property

Level 4 data (the topmost processed level).

__init__(data)

Propagate data up to level 4 using the level mapping.

Parameters

data : TimeseriesLevel Starting data object at any level.

Source code in turban/process/generic/api.py
499
500
501
502
503
504
505
506
507
508
509
510
511
512
def __init__(
    self,
    data: TimeseriesLevel,
):
    """Propagate ``data`` up to level 4 using the level mapping.

    Parameters
    ----------
    data : TimeseriesLevel
        Starting data object at any level.
    """
    for l in range(data._level + 1, 5):
        data = self._level_mapping[l].from_level_below(data)
    self.data = data

from_xarray(data_tree) classmethod

Instantiate pipeline from lowest available level

Source code in turban/process/generic/api.py
567
568
569
570
571
572
573
574
575
576
577
578
579
@classmethod
def from_xarray(cls, data_tree: xr.DataTree):
    """Instantiate pipeline from lowest available level"""
    for level, class_ in cls._level_mapping.items():
        level_str = f"level{level:d}"
        if level_str in data_tree:
            logger.debug(f"Start processing from level {level}")
            data = class_.from_xarray(data_tree[level_str].to_dataset())
            break
    else:
        raise ValueError(f"Could not find any data.")

    return cls(data)

to_xarray()

Export

Source code in turban/process/generic/api.py
556
557
558
559
560
561
562
563
564
565
def to_xarray(self):
    """Export"""
    out_data = {}
    for data in [self.level1, self.level2, self.level3, self.level4]:
        if data is not None:
            out = data.to_xarray()
            out_data[f"level{data._level}"] = xr.DataTree(out)

    data_tree = xr.DataTree(children=out_data)
    return data_tree

TimeseriesLevel dataclass

Source code in turban/process/generic/api.py
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
@dataclass(kw_only=True)
class TimeseriesLevel:
    time: Shaped[ndarray, "time"]

    cfg: SegmentConfig  # only define this here - other levels get it through HasLevelBelow
    _coords: ClassVar[list[str]] = ["time"]
    _level: ClassVar[int]

    def arrays_as_xr_dicts(self):
        """Separate array-typed fields into data-variable and coordinate dicts.

        Returns
        -------
        data_vars : dict
            Fields not listed in ``_coords``, in xarray ``(dims, data)`` format.
        coords : dict
            Fields listed in ``_coords``, in xarray ``(dims, data)`` format.
        """
        dct = {
            name: (
                [dim.name for dim in t.dims],
                getattr(self, name),
            )
            for name, t in get_type_hints_recursive(type(self)).items()
            if isclass(t)
            if issubclass(t, AbstractArray)
        }
        data_vars = {k: v for k, v in dct.items() if k not in self._coords}
        coords = {k: v for k, v in dct.items() if k in self._coords}
        return data_vars, coords

    def to_xarray(self):
        """Export this level to an xarray Dataset.

        Returns
        -------
        xr.Dataset
            Dataset containing all array fields as data variables and coordinates,
            with TURBAN standard attributes and config stored as global attributes.
        """
        data_vars, coords = self.arrays_as_xr_dicts()
        ds = xr.Dataset(data_vars=data_vars, coords=coords)
        for varname in set(list(ds.data_vars) + list(ds.coords)):
            ds[varname].attrs.update(VARIABLES.get(varname, {}))

        # add config as global attributes
        self.cfg.add_to_xarray(ds)
        return ds

    @classmethod
    def from_xarray(cls, ds: xr.Dataset):
        """Instantiate this level from an xarray Dataset.

        Parameters
        ----------
        ds : xr.Dataset
            Dataset containing array fields and config global attributes.

        Returns
        -------
        TimeseriesLevel
            New instance populated from the dataset.
        """
        data = {}
        hints = get_type_hints_recursive(cls)
        for name, arr in dict(**ds.data_vars, **ds.coords).items():
            if name in hints:
                logger.debug(f"Adding `{name}` to data")
                data[name] = arr.values

        data["cfg"] = hints["cfg"].from_xarray(ds)

        return cls(**data)

    def get_attr(self, name):
        """Retrieve an attribute by name.

        Parameters
        ----------
        name : str
            Name of the attribute to retrieve.

        Returns
        -------
        object
            Value of the attribute.
        """
        attr = getattr(self, name)
        if isinstance(attr, SegmentConfig):
            return getattr()

arrays_as_xr_dicts()

Separate array-typed fields into data-variable and coordinate dicts.

Returns

data_vars : dict Fields not listed in _coords, in xarray (dims, data) format. coords : dict Fields listed in _coords, in xarray (dims, data) format.

Source code in turban/process/generic/api.py
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def arrays_as_xr_dicts(self):
    """Separate array-typed fields into data-variable and coordinate dicts.

    Returns
    -------
    data_vars : dict
        Fields not listed in ``_coords``, in xarray ``(dims, data)`` format.
    coords : dict
        Fields listed in ``_coords``, in xarray ``(dims, data)`` format.
    """
    dct = {
        name: (
            [dim.name for dim in t.dims],
            getattr(self, name),
        )
        for name, t in get_type_hints_recursive(type(self)).items()
        if isclass(t)
        if issubclass(t, AbstractArray)
    }
    data_vars = {k: v for k, v in dct.items() if k not in self._coords}
    coords = {k: v for k, v in dct.items() if k in self._coords}
    return data_vars, coords

from_xarray(ds) classmethod

Instantiate this level from an xarray Dataset.

Parameters

ds : xr.Dataset Dataset containing array fields and config global attributes.

Returns

TimeseriesLevel New instance populated from the dataset.

Source code in turban/process/generic/api.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
@classmethod
def from_xarray(cls, ds: xr.Dataset):
    """Instantiate this level from an xarray Dataset.

    Parameters
    ----------
    ds : xr.Dataset
        Dataset containing array fields and config global attributes.

    Returns
    -------
    TimeseriesLevel
        New instance populated from the dataset.
    """
    data = {}
    hints = get_type_hints_recursive(cls)
    for name, arr in dict(**ds.data_vars, **ds.coords).items():
        if name in hints:
            logger.debug(f"Adding `{name}` to data")
            data[name] = arr.values

    data["cfg"] = hints["cfg"].from_xarray(ds)

    return cls(**data)

get_attr(name)

Retrieve an attribute by name.

Parameters

name : str Name of the attribute to retrieve.

Returns

object Value of the attribute.

Source code in turban/process/generic/api.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def get_attr(self, name):
    """Retrieve an attribute by name.

    Parameters
    ----------
    name : str
        Name of the attribute to retrieve.

    Returns
    -------
    object
        Value of the attribute.
    """
    attr = getattr(self, name)
    if isinstance(attr, SegmentConfig):
        return getattr()

to_xarray()

Export this level to an xarray Dataset.

Returns

xr.Dataset Dataset containing all array fields as data variables and coordinates, with TURBAN standard attributes and config stored as global attributes.

Source code in turban/process/generic/api.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def to_xarray(self):
    """Export this level to an xarray Dataset.

    Returns
    -------
    xr.Dataset
        Dataset containing all array fields as data variables and coordinates,
        with TURBAN standard attributes and config stored as global attributes.
    """
    data_vars, coords = self.arrays_as_xr_dicts()
    ds = xr.Dataset(data_vars=data_vars, coords=coords)
    for varname in set(list(ds.data_vars) + list(ds.coords)):
        ds[varname].attrs.update(VARIABLES.get(varname, {}))

    # add config as global attributes
    self.cfg.add_to_xarray(ds)
    return ds

get_type_hints_recursive(cls)

Collect all type definitions on a given class, also from parent classes.

:param obj: Any python object

Source code in turban/process/generic/api.py
44
45
46
47
48
49
50
51
52
53
54
55
def get_type_hints_recursive(cls) -> dict:
    """
    Collect all type definitions on a given class, also from parent classes.

    :param obj: Any python object
    """
    hints = {}
    # loop through MRO in reverse order if anything is superseded by inheritance
    for type_ in cls.mro()[::-1]:
        hints.update(get_type_hints(type_))

    return hints

Generic processing configuration (process/generic/config.py)

SegmentConfig

Bases: BaseModel

Configures segment-wise processing of timeseries.

Source code in turban/process/generic/config.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
class SegmentConfig(BaseModel):
    """Configures segment-wise processing of timeseries."""

    sampfreq: float  # [Hz]
    segment_length: int
    segment_overlap: int
    chunk_length: int
    chunk_overlap: int

    @staticmethod
    def _attrs_from_atomix_netcdf(fname):
        """Extract SegmentConfig attributes from an ATOMIX netCDF file.

        Parameters
        ----------
        fname : str
            Path to the ATOMIX netCDF file.

        Returns
        -------
        dict
            Configuration attributes with keys: ``sampfreq``, ``segment_length``,
            ``segment_overlap``, ``chunk_length``, ``chunk_overlap``.
        """
        with Dataset(fname) as f:
            return dict(
                sampfreq=f.fs_fast,
                segment_length=int(f.fft_length),
                segment_overlap=int(f.fft_length / 2),
                chunk_length=int(f.diss_length),
                chunk_overlap=int(f.overlap),
            )

    @classmethod
    def from_atomix_netcdf(cls, fname):
        """Create a SegmentConfig from an ATOMIX netCDF file.

        Parameters
        ----------
        fname : str
            Path to the ATOMIX netCDF file.

        Returns
        -------
        SegmentConfig
            New instance with attributes read from the file.
        """
        kwarg = cls._attrs_from_atomix_netcdf(fname)
        return cls(**kwarg)

    @property
    def number_fft_windows_per_chunk(self):
        """N_f in the ATOMIX paper"""
        fft_segment_start = np.arange(
            0,
            self.chunk_length - self.segment_length + 1,
            self.segment_length - self.segment_overlap,
        )  # start of each fft segment within a chunk
        return len(fft_segment_start)

    def add_to_xarray(self, ds: xr.Dataset):
        """Write configuration fields as global attributes of an xarray Dataset.

        Parameters
        ----------
        ds : xr.Dataset
            Dataset whose ``attrs`` will be updated in-place.
        """
        ds.attrs.update(self.model_dump())

    @classmethod
    def from_xarray(cls, ds: xr.Dataset):
        """Create from xarray Dataset global attributes.

        Parameters
        ----------
        ds : xr.Dataset
            Dataset whose ``attrs`` contain the configuration fields.

        Returns
        -------
        SegmentConfig
            Validated instance constructed from the dataset attributes.
        """
        return cls.model_validate(ds.attrs)

number_fft_windows_per_chunk property

N_f in the ATOMIX paper

add_to_xarray(ds)

Write configuration fields as global attributes of an xarray Dataset.

Parameters

ds : xr.Dataset Dataset whose attrs will be updated in-place.

Source code in turban/process/generic/config.py
67
68
69
70
71
72
73
74
75
def add_to_xarray(self, ds: xr.Dataset):
    """Write configuration fields as global attributes of an xarray Dataset.

    Parameters
    ----------
    ds : xr.Dataset
        Dataset whose ``attrs`` will be updated in-place.
    """
    ds.attrs.update(self.model_dump())

from_atomix_netcdf(fname) classmethod

Create a SegmentConfig from an ATOMIX netCDF file.

Parameters

fname : str Path to the ATOMIX netCDF file.

Returns

SegmentConfig New instance with attributes read from the file.

Source code in turban/process/generic/config.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
@classmethod
def from_atomix_netcdf(cls, fname):
    """Create a SegmentConfig from an ATOMIX netCDF file.

    Parameters
    ----------
    fname : str
        Path to the ATOMIX netCDF file.

    Returns
    -------
    SegmentConfig
        New instance with attributes read from the file.
    """
    kwarg = cls._attrs_from_atomix_netcdf(fname)
    return cls(**kwarg)

from_xarray(ds) classmethod

Create from xarray Dataset global attributes.

Parameters

ds : xr.Dataset Dataset whose attrs contain the configuration fields.

Returns

SegmentConfig Validated instance constructed from the dataset attributes.

Source code in turban/process/generic/config.py
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
@classmethod
def from_xarray(cls, ds: xr.Dataset):
    """Create from xarray Dataset global attributes.

    Parameters
    ----------
    ds : xr.Dataset
        Dataset whose ``attrs`` contain the configuration fields.

    Returns
    -------
    SegmentConfig
        Validated instance constructed from the dataset attributes.
    """
    return cls.model_validate(ds.attrs)

Shear processing API (process/shear/api.py)

NetcdfReader

Load any netcdf with variable mapping from turban standard name (see variables.py) to name in the netcdf. NB This class is still under construction.

TODO Load the variable mapping directly from variables.py.

Source code in turban/process/shear/api.py
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
class NetcdfReader:
    """Load any netcdf with variable mapping from turban standard name (see variables.py)
    to name in the netcdf.
    NB This class is still under construction.

    TODO Load the variable mapping directly from variables.py."""

    def __init__(self, varmap: dict[str, str] | Literal["atomix"] | None = None):
        if varmap is None:
            self._map = {}
        elif varmap == "atomix":
            self._map = {
                "time": "L1_converted/TIME",
                # 'L1_converted/SHEAR',
                # 'L1_converted/TIME_CTD',
                # 'L1_converted/PSPD_REL',
                "press": "L1_converted/PRES",
                # 'L1_converted/VIB',
                "temp": "L1_converted/TEMP",
                # 'L1_converted/TEMP_CTD',
                "cond": "L1_converted/CNDC",
            }
        else:
            self._map = varmap

    def read(self, fname: str, vars: list[str]) -> list[ndarray]:
        with Dataset(fname) as ds:
            data = [ds[self._map[var]][:] for var in vars]
        return data

ShearLevel1 dataclass

Bases: Level1

Source code in turban/process/shear/api.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
@dataclass(kw_only=True)
class ShearLevel1(Level1):
    shear: Float[ndarray, "nshear time"]
    section_number: Int[ndarray, "time"]
    cfg: ShearConfig

    @classmethod
    def from_atomix_netcdf(cls, fname: str):
        """Load ShearLevel1 from an ATOMIX netCDF file.

        Parameters
        ----------
        fname : str
            Path to the ATOMIX netCDF file.

        Returns
        -------
        ShearLevel1
            Level 1 instance populated from the file.
        """
        ds = xr.load_dataset(fname, group="L1_converted")
        # TODO: handle section_number through level 2
        ds2 = xr.load_dataset(fname, group="L2_cleaned")
        return cls(
            time=ds.TIME.values,
            senspeed=ds.PSPD_REL.values if "PSPD_REL" in ds else ds2.PSPD_REL.values,
            shear=ds.SHEAR.values,
            section_number=ds2["SECTION_NUMBER"].values.astype(int),
            cfg=cast(ShearConfig, ShearConfig.from_atomix_netcdf(fname)),
        )

from_atomix_netcdf(fname) classmethod

Load ShearLevel1 from an ATOMIX netCDF file.

Parameters

fname : str Path to the ATOMIX netCDF file.

Returns

ShearLevel1 Level 1 instance populated from the file.

Source code in turban/process/shear/api.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
@classmethod
def from_atomix_netcdf(cls, fname: str):
    """Load ShearLevel1 from an ATOMIX netCDF file.

    Parameters
    ----------
    fname : str
        Path to the ATOMIX netCDF file.

    Returns
    -------
    ShearLevel1
        Level 1 instance populated from the file.
    """
    ds = xr.load_dataset(fname, group="L1_converted")
    # TODO: handle section_number through level 2
    ds2 = xr.load_dataset(fname, group="L2_cleaned")
    return cls(
        time=ds.TIME.values,
        senspeed=ds.PSPD_REL.values if "PSPD_REL" in ds else ds2.PSPD_REL.values,
        shear=ds.SHEAR.values,
        section_number=ds2["SECTION_NUMBER"].values.astype(int),
        cfg=cast(ShearConfig, ShearConfig.from_atomix_netcdf(fname)),
    )

ShearLevel2 dataclass

Bases: Level2

Source code in turban/process/shear/api.py
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
@dataclass(kw_only=True)
class ShearLevel2(Level2):
    shear: Float[ndarray, "nshear time"]
    section_number: Int[ndarray, "time"]
    num_despike_iter: Int[ndarray, "nshear time"]
    cfg: ShearConfig

    @classmethod
    def _from_level_below_kwarg(
        cls,
        data: ShearLevel1,
    ):
        """Build constructor kwargs for ShearLevel2 by despiking ShearLevel1 shear.

        Parameters
        ----------
        data : ShearLevel1
            Level 1 data containing raw shear and configuration.

        Returns
        -------
        dict
            Keyword arguments to pass to the ShearLevel2 constructor.
        """
        kwarg = super()._from_level_below_kwarg(data)
        level1 = data
        cfg = cast(ShearConfig, level1.cfg)  # just for type checkers to understand type
        sh_cleaned, num_despike_iter = process_level2(
            level1.shear,
            level1.section_number,
            cfg.sampfreq,
            cfg.segment_length,
            cfg.cutoff_freq_lp,
            cfg.spike_threshold,
            cfg.max_tries,
            cfg.spike_replace_before,
            cfg.spike_replace_after,
            cfg.spike_include_before,
            cfg.spike_include_after,
        )

        kwarg.update(
            dict(
                time=level1.time,
                shear=sh_cleaned,
                senspeed=level1.senspeed,
                section_number=level1.section_number,
                num_despike_iter=num_despike_iter,
                level_below=level1,
            )
        )
        return kwarg

    @classmethod
    def from_atomix_netcdf(cls, fname: str):
        """Load ShearLevel2 from an ATOMIX netCDF file.

        Parameters
        ----------
        fname : str
            Path to the ATOMIX netCDF file.

        Returns
        -------
        ShearLevel2
            Level 2 instance populated from the file (with Level1 attached below).
        """
        ds = xr.load_dataset(fname, group="L2_cleaned")
        return cls(
            time=ds.TIME.values,
            shear=ds.SHEAR.values,
            senspeed=ds.PSPD_REL.values,
            section_number=ds["SECTION_NUMBER"].values.astype(int),
            # TODO: this does not seem to be saved in (all?) ATOMIX files
            num_despike_iter=9999 * np.zeros_like(ds.SHEAR.values, dtype=int),
            level_below=ShearLevel1.from_atomix_netcdf(fname),
        )

from_atomix_netcdf(fname) classmethod

Load ShearLevel2 from an ATOMIX netCDF file.

Parameters

fname : str Path to the ATOMIX netCDF file.

Returns

ShearLevel2 Level 2 instance populated from the file (with Level1 attached below).

Source code in turban/process/shear/api.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
@classmethod
def from_atomix_netcdf(cls, fname: str):
    """Load ShearLevel2 from an ATOMIX netCDF file.

    Parameters
    ----------
    fname : str
        Path to the ATOMIX netCDF file.

    Returns
    -------
    ShearLevel2
        Level 2 instance populated from the file (with Level1 attached below).
    """
    ds = xr.load_dataset(fname, group="L2_cleaned")
    return cls(
        time=ds.TIME.values,
        shear=ds.SHEAR.values,
        senspeed=ds.PSPD_REL.values,
        section_number=ds["SECTION_NUMBER"].values.astype(int),
        # TODO: this does not seem to be saved in (all?) ATOMIX files
        num_despike_iter=9999 * np.zeros_like(ds.SHEAR.values, dtype=int),
        level_below=ShearLevel1.from_atomix_netcdf(fname),
    )

ShearLevel3 dataclass

Bases: Level3

Source code in turban/process/shear/api.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
@dataclass(kw_only=True)
class ShearLevel3(Level3):
    psi_k_sh: Float[ndarray, "nshear time waveno"]
    psi_f_sh: Float[ndarray, "nshear time waveno"]
    # TODO load from atomix netcdf
    spike_fraction: Float[ndarray, "nshear time"]
    max_despike_iter: Int[ndarray, "nshear time"]
    cfg: ShearConfig

    @classmethod
    def _from_level_below_kwarg(
        cls,
        data: ShearLevel2,
    ) -> dict:
        kwarg = super()._from_level_below_kwarg(data)
        k, psi_k_sh, psi_f_sh, freq, senspeed, section_number = process_level3(
            shear=data.shear,
            senspeed=data.senspeed,
            section_number=data.section_number,
            segment_length=data.cfg.segment_length,
            sampfreq=data.cfg.sampfreq,
            spatial_response_wavenum=data.cfg.spatial_response_wavenum,
            freq_highpass=data.cfg.freq_highpass,
            segment_overlap=data.cfg.segment_overlap,
            chunk_length=data.cfg.chunk_length,
            chunk_overlap=data.cfg.chunk_overlap,
        )

        spikes_per_chunk = agg_fast_to_slow(
            data.num_despike_iter > 0,  # has been despiked
            chunk_length=data.cfg.chunk_length,
            chunk_overlap=data.cfg.chunk_overlap,
            section_number_or_data_len=data.section_number,
            agg_method="sum",  # count number of occurences
        )

        spike_fraction = spikes_per_chunk / data.cfg.chunk_length

        max_despike_iter = agg_fast_to_slow(
            data.num_despike_iter,
            chunk_length=data.cfg.chunk_length,
            chunk_overlap=data.cfg.chunk_overlap,
            section_number_or_data_len=data.section_number,
            agg_method="max",
        )

        time_slow = agg_fast_to_slow(
            data.time,
            chunk_length=data.cfg.chunk_length,
            chunk_overlap=data.cfg.chunk_overlap,
            section_number_or_data_len=data.section_number,
            agg_method="take_mid",
        )
        kwarg.update(
            dict(
                time=time_slow,
                psi_k_sh=psi_k_sh,
                waveno=k,
                psi_f_sh=psi_f_sh,
                freq=freq,
                senspeed=senspeed,
                section_number=section_number,
                spike_fraction=spike_fraction,
                max_despike_iter=max_despike_iter,
                level_below=data,
            )
        )
        return kwarg

    @classmethod
    def from_atomix_netcdf(cls, fname: str):
        ds = xr.load_dataset(fname, group="L3_spectra")
        ds = ds.transpose(
            ...,
            "N_SHEAR_SENSORS",
            "TIME_SPECTRA",
            "WAVENUMBER" if "WAVENUMBER" in ds.dims else "N_WAVENUMBER",
        )

        return cls(
            time=ds["TIME"].values,
            psi_k_sh=ds["SH_SPEC"].values,
            waveno=ds["KCYC"].values,
            psi_f_sh=ds["SH_SPEC"].values * np.nan,
            freq=np.nan * np.ones(ds["KCYC"].values.shape[-1]),
            senspeed=ds["PSPD_REL"].values,
            section_number=ds["SECTION_NUMBER"].values.astype(int),
            spike_fraction=np.nan * np.ones_like(ds["SH_SPEC"].values[:, :, 0]),
            # TODO: this does not seem to be saved in (all?) ATOMIX files
            max_despike_iter=9999
            * np.ones_like(ds["SH_SPEC"].values[:, :, 0], dtype=int),
            level_below=ShearLevel2.from_atomix_netcdf(fname),
        )

    @property
    def number_signals_vibration_removal(self):
        """N_V in the ATOMIX paper"""
        logger.warning("Not implemented")
        return 0

    @property
    def log_psi_var(self):
        r"""sigma^2_{ln\Psi} in the ATOMIX paper"""
        return (
            5
            / 4
            * (
                self.cfg.number_fft_windows_per_chunk
                - self.number_signals_vibration_removal
            )
            ** (-7 / 9)
        )

    @property
    def psi_k_sh_confidence_interval(self) -> Float[ndarray, "2 time waveno"]:
        """95% confidence interval of power spectrum.
        Eq. 23 in the ATOMIX paper"""
        return np.concatenate(
            (
                self.psi_k_sh * np.exp(1.96 * self.log_psi_var)[newaxis, ...],
                self.psi_k_sh * np.exp(-1.96 * self.log_psi_var)[newaxis, ...],
            ),
            axis=0,
        )

    @property
    def data_length(self) -> Float[ndarray, "time"]:
        r"""l_\epsilon in ATOMIX paper"""
        tau_eps = self.cfg.chunk_length / self.cfg.sampfreq
        return tau_eps * self.senspeed

data_length property

l_\epsilon in ATOMIX paper

log_psi_var property

sigma^2_{ln\Psi} in the ATOMIX paper

number_signals_vibration_removal property

N_V in the ATOMIX paper

psi_k_sh_confidence_interval property

95% confidence interval of power spectrum. Eq. 23 in the ATOMIX paper

ShearProcessing

Bases: Processing

Source code in turban/process/shear/api.py
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
class ShearProcessing(Processing):

    _level_mapping = {1: ShearLevel1, 2: ShearLevel2, 3: ShearLevel3, 4: ShearLevel4}

    @classmethod
    def from_atomix_netcdf(
        cls,
        fname: str,
        level: Literal[1, 2, 3, 4],
        data_aux: AuxDataTypehintLevel12 | AuxDataTypehintLevel34 | None = None,
    ):
        """
        Create shear processing pipeline from ATOMIX netcdf file.

        Supplying data_aux here triggers the full API (dictionary with aggregation
        instructions if level<=2). If one wishes to use the simplified data aggregation
        API, one should first create a ShearLevelN object, then use .add_aux_data().
        """
        data = cls._level_mapping[level].from_atomix_netcdf(fname)
        data.add_aux_data(data_aux)
        return cls(data)

from_atomix_netcdf(fname, level, data_aux=None) classmethod

Create shear processing pipeline from ATOMIX netcdf file.

Supplying data_aux here triggers the full API (dictionary with aggregation instructions if level<=2). If one wishes to use the simplified data aggregation API, one should first create a ShearLevelN object, then use .add_aux_data().

Source code in turban/process/shear/api.py
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
@classmethod
def from_atomix_netcdf(
    cls,
    fname: str,
    level: Literal[1, 2, 3, 4],
    data_aux: AuxDataTypehintLevel12 | AuxDataTypehintLevel34 | None = None,
):
    """
    Create shear processing pipeline from ATOMIX netcdf file.

    Supplying data_aux here triggers the full API (dictionary with aggregation
    instructions if level<=2). If one wishes to use the simplified data aggregation
    API, one should first create a ShearLevelN object, then use .add_aux_data().
    """
    data = cls._level_mapping[level].from_atomix_netcdf(fname)
    data.add_aux_data(data_aux)
    return cls(data)

Shear processing configuration (process/shear/config.py)

Microtemperature processing API (process/utemp/api.py)

UTempLevel2 dataclass

Bases: Level2

Source code in turban/process/utemp/api.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
@dataclass(kw_only=True)
class UTempLevel2(Level2):
    senspeed: Float[ndarray, "time"]
    dtempdt: Float[ndarray, "ntemp time"]
    # num_despike_iter: Int[ndarray, "n_shear time"]

    @classmethod
    def _from_level_below_kwarg(
        cls,
        data: UTempLevel1,
    ):
        """Build constructor kwargs for UTempLevel2 from UTempLevel1 data.

        Parameters
        ----------
        data : UTempLevel1
            Level 1 microtemperature data.

        Returns
        -------
        dict
            Keyword arguments to pass to the UTempLevel2 constructor.
        """
        kwarg = super()._from_level_below_kwarg(data)
        level1 = data
        cfg = cast(UTempConfig, level1.cfg)  # just for type checkers to understand type
        utemp_cleaned = level1.dtempdt
        # utemp_cleaned, num_despike_iter = process_level2(
        #     level1.utemp,
        #     level1.section_number,
        #     cfg.sampfreq,
        #     cfg.segment_length,
        # )

        kwarg.update(
            dict(
                time=level1.time,
                dtempdt=utemp_cleaned,
                senspeed=level1.senspeed,
                # num_despike_iter=num_despike_iter,
                level_below=level1,
                cfg=cfg,
            )
        )
        return kwarg

UTempLevel3 dataclass

Bases: Level3

Source code in turban/process/utemp/api.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
@dataclass(kw_only=True)
class UTempLevel3(Level3):
    psi_k: Float[ndarray, "ntemp time freq"]
    psi_noise: Float[ndarray, "ntemp freq"]
    psi_f: Float[ndarray, "ntemp time freq"]

    @classmethod
    def _from_level_below_kwarg(
        cls,
        data: UTempLevel2,
    ) -> dict:
        """Build constructor kwargs for UTempLevel3 by computing temperature gradient spectra.

        Parameters
        ----------
        data : UTempLevel2
            Level 2 microtemperature data.

        Returns
        -------
        dict
            Keyword arguments to pass to the UTempLevel3 constructor.
        """
        level2 = data
        cfg = cast(UTempConfig, level2.cfg)
        kwarg = super()._from_level_below_kwarg(level2)

        (
            waveno,
            psi_k,
            psi_f,
            freq,
            senspeed_avg,
            section_number_slow,
            psi_noise,
            reshape_index,
        ) = temperature_gradient_spectra(
            dtempdt=level2.dtempdt,
            senspeed=level2.senspeed,
            segment_length=cfg.segment_length,
            segment_overlap=cfg.segment_overlap,
            chunk_length=cfg.chunk_length,
            chunk_overlap=cfg.chunk_overlap,
            sampfreq=cfg.sampfreq,
            waveno_limit_upper=cfg.waveno_limit_upper,
            diff_gain=cfg.diff_gain,
            section_number=level2.section_number,
        )

        kwarg.update(
            dict(
                time=agg_fast_to_slow(
                    level2.time,
                    section_number_or_data_len=level2.section_number,
                    chunk_length=level2.cfg.chunk_length,
                    chunk_overlap=level2.cfg.chunk_overlap,
                    agg_method="take_mid",
                ),
                psi_k=psi_k,
                psi_noise=psi_noise,
                waveno=waveno,
                psi_f=psi_f,
                freq=freq,
                senspeed=senspeed_avg,
                section_number=section_number_slow,
                level_below=level2,
                cfg=cfg,
            )
        )
        return kwarg

Microtemperature processing configuration (process/utemp/config.py)

UTempConfig

Bases: SegmentConfig

Configures processing of timeseries using temperature gradient spectra.

Source code in turban/process/utemp/config.py
4
5
6
7
8
class UTempConfig(SegmentConfig):
    """Configures processing of timeseries using temperature gradient spectra."""

    waveno_limit_upper: float = 500.0
    diff_gain: float

Utilities

Utility functions (utils/util.py)

agg_fast_to_slow(x, section_number_or_data_len=None, chunk_length=None, chunk_overlap=None, agg_method='mean', reshape_index=None)

Aggregate any quantities from fast sampling rate (e.g., shear timeseries) to slow sampling rate (e.g, spectra). If reshape_index is supplied, section_number_or_data_len, chunk_length and chunk_overlap are disregarded

Parameters

method: "take_first": use first value of every chunk "take_mid": use midpoint value of every chunk "take_last": use last value of every chunk any other: use numpy function (e.g., "mean", "max", ... )

agg_method can be anything that is an attribute of a numpy array, e.g. mean, max, etc.

Source code in turban/utils/util.py
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
def agg_fast_to_slow(
    x: Shaped[ndarray, "*any time_fast"],
    section_number_or_data_len: Int[ndarray, "num_data"] | int | None = None,
    chunk_length: int | None = None,
    chunk_overlap: int | None = None,
    agg_method: Literal["take_first", "take_mid", "take_last"] | str = "mean",
    reshape_index: Int[ndarray, "diss_chunk fft_chunk"] | None = None,
) -> Shaped[ndarray, "*any time_slow"]:
    """
    Aggregate any quantities from fast sampling rate (e.g., shear timeseries)
    to slow sampling rate (e.g, spectra).
    If reshape_index is supplied, `section_number_or_data_len`, `chunk_length` and
    `chunk_overlap` are disregarded

    Parameters
    ----------
    method:
        "take_first": use first value of every chunk
        "take_mid": use midpoint value of every chunk
        "take_last": use last value of every chunk
        any other: use numpy function (e.g., "mean", "max", ... )

    `agg_method` can be anything that is an attribute of a numpy array, e.g. `mean`,
    `max`, etc.
    """
    if reshape_index is None:
        ii = get_chunking_index(
            section_number_or_data_len,
            (chunk_length, chunk_overlap),
        )
    else:
        ii = reshape_index

    match agg_method:
        case "take_first":
            return x[..., ii[:, 0]]
        case "take_mid":
            return x[..., ii[:, ii.shape[1] // 2]]
        case "take_last":
            return x[..., ii[:, -1]]
        case "grad":
            raise NotImplementedError("Cannot calculate gradients yet")

    # no other method fit the bill, so look in numpy:
    xi: Shaped[ndarray, "*any time_slow chunk_length"] = x[..., ii]
    return getattr(np, agg_method)(xi, axis=-1)

atleast_nd_last(arr, targetshape)

Prepend size-1 axes until arr has the same number of dimensions as targetshape.

Parameters

arr : ndarray, shape (..., dim0) Input array to expand. targetshape : tuple of int Target shape whose number of dimensions is the goal.

Returns

ndarray Array with leading size-1 axes added as needed.

Source code in turban/utils/util.py
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
def atleast_nd_last(arr: Float[ndarray, "... dim0"], targetshape: tuple[int, ...]):
    """Prepend size-1 axes until ``arr`` has the same number of dimensions as ``targetshape``.

    Parameters
    ----------
    arr : ndarray, shape (..., dim0)
        Input array to expand.
    targetshape : tuple of int
        Target shape whose number of dimensions is the goal.

    Returns
    -------
    ndarray
        Array with leading size-1 axes added as needed.
    """
    for _ in range(len(targetshape) - len(arr.shape)):
        arr = arr[np.newaxis, ...]
    return arr

boolarr_to_sections(bools)

Partition boolean array into contiguous True-valued index ranges.

Parameters

bools : Bool[ndarray, "time"] Boolean array.

Returns

list[list[int]] List of contiguous index ranges where bools is True.

Source code in turban/utils/util.py
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
def boolarr_to_sections(bools: Bool[ndarray, "time"]) -> list[list[int]]:
    """Partition boolean array into contiguous True-valued index ranges.

    Parameters
    ----------
    bools : Bool[ndarray, "time"]
        Boolean array.

    Returns
    -------
    list[list[int]]
        List of contiguous index ranges where `bools` is True.
    """
    bools_as_ints = list(map(int, bools))
    sections = []
    # make sure first and last sections are picked up
    # even if bordering on list start or end
    offset = 0
    if bools[0]:
        bools_as_ints.insert(0, 0)
        offset = 1
    if bools[-1]:
        bools_as_ints.append(0)

    # register every jump from True to False
    true_to_false = np.diff(bools_as_ints) == -1
    # vice versa
    false_to_true = np.diff(bools_as_ints) == 1

    for ic0, ic1 in zip(np.flatnonzero(false_to_true), np.flatnonzero(true_to_false)):
        sections.append(list(range(ic0 + 1 - offset, ic1 + 1 - offset)))

    return sections

butterfilt(signal, cutoff_freq_Hz, sampfreq, axis=-1, **kwarg)

Apply first-order Butterworth filter.

Parameters

signal : array_like Input signal. cutoff_freq_Hz : float Cutoff frequency in Hz. sampfreq : float Sampling frequency in Hz. **kwarg Additional keyword arguments passed to scipy.signal.butter.

Returns

ndarray Filtered signal.

Source code in turban/utils/util.py
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
def butterfilt(signal, cutoff_freq_Hz, sampfreq, axis=-1, **kwarg):
    """Apply first-order Butterworth filter.

    Parameters
    ----------
    signal : array_like
        Input signal.
    cutoff_freq_Hz : float
        Cutoff frequency in Hz.
    sampfreq : float
        Sampling frequency in Hz.
    **kwarg
        Additional keyword arguments passed to `scipy.signal.butter`.

    Returns
    -------
    ndarray
        Filtered signal.
    """
    # nondimensionalize using Nyquist freq
    cutoff_nondim = cutoff_freq_Hz / (sampfreq / 2)
    b, a = butter(N=1, Wn=cutoff_nondim, **kwarg)
    return filtfilt(b, a, signal, axis=axis)

define_sections(*data_and_bounds, segment_min_len=0, trim=0)

Identify contiguous sections where all data fall within specified bounds.

Parameters

data_and_bounds : tuple[Float[ndarray, "any"], float, float] Variable number of (data_array, lo, up) tuples specifying data arrays and their lower and upper bounds. Sections must satisfy all bounds. segment_min_len : int, optional Minimum segment length to retain. Default is 0. trim : int, optional Positive values shrink sections by this amount; negative values widen them. Default is 0.

Returns

Int[ndarray, "*any"] Section numbers (0 for invalid regions, positive integers for valid sections).

Source code in turban/utils/util.py
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
def define_sections(
    *data_and_bounds: tuple[Float[ndarray, "*any"], float, float],
    segment_min_len: int = 0,
    trim: int = 0,
) -> Int[ndarray, "*any"]:
    """Identify contiguous sections where all data fall within specified bounds.

    Parameters
    ----------
    *data_and_bounds : tuple[Float[ndarray, "*any"], float, float]
        Variable number of (data_array, lo, up) tuples specifying data arrays
        and their lower and upper bounds. Sections must satisfy all bounds.
    segment_min_len : int, optional
        Minimum segment length to retain. Default is 0.
    trim : int, optional
        Positive values shrink sections by this amount; negative values widen them.
        Default is 0.

    Returns
    -------
    Int[ndarray, "*any"]
        Section numbers (0 for invalid regions, positive integers for valid sections).
    """
    data_shp = data_and_bounds[0][0].shape  # select first data entry as sample
    inds = np.ones(data_shp, dtype=bool)
    for data, lo, up in data_and_bounds:
        if lo is not None:
            inds = inds & (lo <= np.array(data))
        if up is not None:
            inds = inds & (up >= np.array(data))

    if trim != 0:
        abstrim = abs(trim)
        inds_rollpos = np.roll(inds, abstrim)
        inds_rollneg = np.roll(inds, -abstrim)
        # handle wraparound of np.roll
        inds_rollpos[:abstrim] = False
        inds_rollneg[-abstrim:] = False

        if trim > 0:
            # trim so that sections become shorter
            inds = inds & inds_rollneg & inds_rollpos
        else:
            # sections become longer
            inds = inds | inds_rollneg | inds_rollpos

    sections = boolarr_to_sections(inds)

    if segment_min_len > 0:
        sections = [sec for sec in sections if len(sec) >= segment_min_len]

    section_number = np.zeros(data_shp, dtype=int)
    for k, sec in enumerate(sections):
        section_number[sec] = k + 1

    return section_number

diss_chunk_wise_reshape_index(reshape_index)

Flatten last two dimensions while ensuring unique indices per diss_chunk.

Parameters

reshape_index : Int[ndarray, "diss_chunk fft_chunk segment_length"] 3D index array to flatten.

Returns

Int[ndarray, "diss_chunk chunk_length"] 2D index array with unique indices for each diss_chunk.

Source code in turban/utils/util.py
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
def diss_chunk_wise_reshape_index(
    reshape_index: Int[ndarray, "diss_chunk fft_chunk segment_length"],
) -> Int[ndarray, "diss_chunk chunk_length"]:
    """Flatten last two dimensions while ensuring unique indices per diss_chunk.

    Parameters
    ----------
    reshape_index : Int[ndarray, "diss_chunk fft_chunk segment_length"]
        3D index array to flatten.

    Returns
    -------
    Int[ndarray, "diss_chunk chunk_length"]
        2D index array with unique indices for each diss_chunk.
    """
    ii = reshape_index
    ii_flat = ii.reshape((ii.shape[0], ii.shape[1] * ii.shape[2]))
    chunk_length = ii_flat[0].max() - ii_flat[0].min() + 1
    return ii_flat.min(axis=1)[:, newaxis] + np.arange(0, chunk_length)[newaxis, :]

ensure_reshape_index(func)

Decorate func to accept alternative chunking parameters.

Enables func to accept section_number_or_data_len, segment_length, segment_overlap, chunk_length, and chunk_overlap as alternatives to reshape_index.

Parameters

func : callable Function to decorate.

Returns

callable Decorated function with reshape_index parameter resolution.

Source code in turban/utils/util.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def ensure_reshape_index(func):
    """Decorate `func` to accept alternative chunking parameters.

    Enables `func` to accept `section_number_or_data_len`, `segment_length`,
    `segment_overlap`, `chunk_length`, and `chunk_overlap` as alternatives to
    `reshape_index`.

    Parameters
    ----------
    func : callable
        Function to decorate.

    Returns
    -------
    callable
        Decorated function with reshape_index parameter resolution.
    """

    @wraps(func)
    def decorated(
        *argv,
        section_number_or_data_len: Int[ndarray, "num_data"] | int | None = None,
        segment_length: int | None = None,
        segment_overlap: int | None = None,
        chunk_length: int | None = None,
        chunk_overlap: int | None = None,
        **kwarg,
    ):
        if "reshape_index" not in kwarg or kwarg["reshape_index"] is None:
            # now the other parameters cannot be None
            section_number_or_data_len = cast(
                Int[ndarray, "num_data"] | int, section_number_or_data_len
            )
            segment_length = cast(int, segment_length)
            segment_overlap = cast(int, segment_overlap)
            chunk_length = cast(int, chunk_length)
            chunk_overlap = cast(int, chunk_overlap)
            kwarg["reshape_index"] = get_chunking_index(
                section_number_or_data_len,
                (chunk_length, chunk_overlap),
                (segment_length, segment_overlap),
            )
        elif (
            segment_length is not None
            or segment_overlap is not None
            or chunk_length is not None
            or chunk_overlap is not None
            or section_number_or_data_len is not None
        ):
            raise Warning(
                (
                    "Disregarding superfluous parameters: ",
                    "section_number_or_data_len, ",
                    "segment_length, ",
                    "segment_overlap, ",
                    "chunk_length, ",
                    "chunk_overlap.",
                )
            )

        return func(*argv, **kwarg)

    return decorated

fft_grad(signal, dt)

Compute gradient via FFT along last axis.

Parameters

signal : Float[ndarray, "... time"] Input signal. dt : float Time step in seconds.

Returns

Float[ndarray, "... freq"] Gradient computed via FFT.

Source code in turban/utils/util.py
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
def fft_grad(
    signal: Float[ndarray, "... time"],
    dt: float,
) -> Float[ndarray, "... freq"]:
    """Compute gradient via FFT along last axis.

    Parameters
    ----------
    signal : Float[ndarray, "... time"]
        Input signal.
    dt : float
        Time step in seconds.

    Returns
    -------
    Float[ndarray, "... freq"]
        Gradient computed via FFT.
    """
    N = signal.shape[-1]
    x = np.concatenate(
        (signal, signal[::-1]), axis=-1
    )  # make periodic to avoid spectral leakage
    f = fftfreq(2 * N, dt)
    f_broadcast = atleast_nd_last(f, x.shape)
    dxdt = ifft(2 * np.pi * f_broadcast * 1j * fft(x, axis=-1), axis=-1)
    return dxdt[..., :N].real

get_chunking_index(section_number_or_data_len, *length_and_overlap)

Create index array for rechunking a dimension into overlapping segments.

Parameters

section_number_or_data_len : Int[ndarray, "time_fast"] or int Section markers (array of int) or data length (int). *length_and_overlap : tuple[int, int] Variable number of (length, overlap) tuples specifying chunk parameters to be applied successively.

Returns

Int[ndarray, "*any"] Index array for chunking.

Source code in turban/utils/util.py
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
def get_chunking_index(
    section_number_or_data_len: Int[ndarray, "time_fast"] | int,
    *length_and_overlap: tuple[int, int],
) -> Int[ndarray, "*any"]:
    """Create index array for rechunking a dimension into overlapping segments.

    Parameters
    ----------
    section_number_or_data_len : Int[ndarray, "time_fast"] or int
        Section markers (array of int) or data length (int).
    *length_and_overlap : tuple[int, int]
        Variable number of (length, overlap) tuples specifying chunk parameters
        to be applied successively.

    Returns
    -------
    Int[ndarray, "*any"]
        Index array for chunking.
    """

    if isinstance(section_number_or_data_len, int):
        data_len = section_number_or_data_len
        section_number = np.ones(data_len, dtype=int)
    else:
        section_number = section_number_or_data_len
        data_len = len(section_number_or_data_len)

    indices = np.arange(data_len)
    sections = split_data(indices, section_number)

    for k, (length, overlap) in enumerate(length_and_overlap):
        if k == 0:
            reshape_segments = []
            for section in sections.values():
                ii: Int[ndarray, "Nchunks chunklen"] = reshape_overlap_index(
                    length, overlap, len(section)
                )
                reshape_segments.append(ii + section[0])
                ichunk = np.concatenate(reshape_segments, axis=0)

        else:
            # successive iterations
            ii: Int[ndarray, "Nchunks chunklen"] = reshape_overlap_index(
                length, overlap, ii.shape[-1]
            )
            ichunk = ichunk[..., ii]

    return ichunk

get_vsink(pressure_raw, sampfreq=1024.0)

Estimate sinking velocity from raw pressure by low-pass filtering and differentiation.

Parameters

pressure_raw : array_like Raw pressure time series. sampfreq : float, optional Sampling frequency in Hz. Default is 1024.0.

Returns

vsink : ndarray Estimated sinking velocity (dP/dt of low-pass filtered pressure). pressure_lp : ndarray Low-pass filtered pressure time series.

Source code in turban/utils/util.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
def get_vsink(pressure_raw, sampfreq=1024.0):
    """Estimate sinking velocity from raw pressure by low-pass filtering and differentiation.

    Parameters
    ----------
    pressure_raw : array_like
        Raw pressure time series.
    sampfreq : float, optional
        Sampling frequency in Hz. Default is 1024.0.

    Returns
    -------
    vsink : ndarray
        Estimated sinking velocity (dP/dt of low-pass filtered pressure).
    pressure_lp : ndarray
        Low-pass filtered pressure time series.
    """
    # lowpass filter pressure
    pressure_lp = butterfilt(
        signal=pressure_raw,
        cutoff_freq_Hz=0.5,
        sampfreq=sampfreq,
        btype="low",
    )
    # sinking speed
    vsink = fft_grad(pressure_lp, 1 / sampfreq)
    return vsink, pressure_lp

integrate(y, x, x_from, x_to)

Integrate y over x along last axis using trapezoidal rule.

Parameters

y : Float[ndarray, "... time frequency"] Integrand values. x : Float[ndarray, "... time frequency"] Corresponding x-axis values. x_from : Float[ndarray, "... time"] Lower integration limits. x_to : Float[ndarray, "... time"] Upper integration limits.

Returns

Float[ndarray, "... time"] Integral along last axis.

Source code in turban/utils/util.py
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
def integrate(
    y: Float[ndarray, "... time frequency"],
    x: Float[ndarray, "... time frequency"],
    x_from: Float[ndarray, "... time"],
    x_to: Float[ndarray, "... time"],
) -> Float[ndarray, "... time"]:
    """Integrate `y` over `x` along last axis using trapezoidal rule.

    Parameters
    ----------
    y : Float[ndarray, "... time frequency"]
        Integrand values.
    x : Float[ndarray, "... time frequency"]
        Corresponding x-axis values.
    x_from : Float[ndarray, "... time"]
        Lower integration limits.
    x_to : Float[ndarray, "... time"]
        Upper integration limits.

    Returns
    -------
    Float[ndarray, "... time"]
        Integral along last axis.
    """
    y_zero = np.where((x_from[..., newaxis] <= x) & (x <= x_to[..., newaxis]), y, 0.0)
    # TODO: handle all-nan spectra
    return np.trapezoid(y_zero, x=x, axis=-1)

kolmogorov_length(eps, molvisc)

Compute the Kolmogorov length scale.

Parameters

eps : Float[ndarray, "any"] TKE dissipation rate in W/kg. molvisc : Float[ndarray, "any"] Kinematic viscosity in m^2/s.

Returns

Float[ndarray, "*any"] Kolmogorov length scale in m.

Source code in turban/utils/util.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def kolmogorov_length(
    eps: Float[ndarray, "*any"],
    molvisc: Float[ndarray, "*any"],
) -> Float[ndarray, "*any"]:
    """Compute the Kolmogorov length scale.

    Parameters
    ----------
    eps : Float[ndarray, "*any"]
        TKE dissipation rate in W/kg.
    molvisc : Float[ndarray, "*any"]
        Kinematic viscosity in m^2/s.

    Returns
    -------
    Float[ndarray, "*any"]
        Kolmogorov length scale in m.
    """
    return (molvisc**3 / eps) ** 0.25

reshape_any_first(P, chunklen, chunkoverlap)

Reshape first dimension into overlapping segments.

Parameters

P : Float[ndarray, "samples ..."] Input array. chunklen : int Chunk length. chunkoverlap : int Chunk overlap length.

Returns

Float[ndarray, "segment inside ..."] Reshaped array, or zero array if dimension is smaller than chunklen.

Source code in turban/utils/util.py
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
def reshape_any_first(
    P: Float[ndarray, "samples ..."],
    chunklen: int,
    chunkoverlap: int,
) -> Float[ndarray, "segment inside ..."]:
    """Reshape first dimension into overlapping segments.

    Parameters
    ----------
    P : Float[ndarray, "samples ..."]
        Input array.
    chunklen : int
        Chunk length.
    chunkoverlap : int
        Chunk overlap length.

    Returns
    -------
    Float[ndarray, "segment inside ..."]
        Reshaped array, or zero array if dimension is smaller than `chunklen`.
    """
    n = P.shape[0]
    if n >= chunklen:
        ii = reshape_overlap_index(chunklen, chunkoverlap, n)
        return P[np.array(ii), ...]
    else:
        return np.zeros((0, chunklen) + P.shape[1:], dtype=float)

reshape_any_last(P, chunklen, chunkoverlap)

Reshape last dimension into overlapping segments.

Parameters

P : Float[ndarray, "... samples"] Input array. chunklen : int Chunk length. chunkoverlap : int Chunk overlap length.

Returns

Float[ndarray, "... segment inside"] Reshaped array, or zero array if dimension is smaller than chunklen.

Source code in turban/utils/util.py
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
def reshape_any_last(
    P: Float[ndarray, "... samples"],
    chunklen: int,
    chunkoverlap: int,
) -> Float[ndarray, "... segment inside"]:
    """Reshape last dimension into overlapping segments.

    Parameters
    ----------
    P : Float[ndarray, "... samples"]
        Input array.
    chunklen : int
        Chunk length.
    chunkoverlap : int
        Chunk overlap length.

    Returns
    -------
    Float[ndarray, "... segment inside"]
        Reshaped array, or zero array if dimension is smaller than `chunklen`.
    """
    n = P.shape[-1]
    if n >= chunklen:
        ii = reshape_overlap_index(chunklen, chunkoverlap, n)
        return P[..., np.array(ii)]
    else:
        return np.zeros(P.shape[:-1] + (0, chunklen), dtype=float)

reshape_any_nextlast(P, chunklen, chunkoverlap)

Reshape next-to-last dimension into overlapping segments.

Parameters

P : Float[ndarray, "... samples _"] Input array. chunklen : int Chunk length. chunkoverlap : int Chunk overlap length.

Returns

Float[ndarray, "... segment inside _"] Reshaped array, or zero array if dimension is smaller than chunklen.

Source code in turban/utils/util.py
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
def reshape_any_nextlast(
    P: Float[ndarray, "... samples _"],
    chunklen: int,
    chunkoverlap: int,
) -> Float[ndarray, "... segment inside _"]:
    """Reshape next-to-last dimension into overlapping segments.

    Parameters
    ----------
    P : Float[ndarray, "... samples _"]
        Input array.
    chunklen : int
        Chunk length.
    chunkoverlap : int
        Chunk overlap length.

    Returns
    -------
    Float[ndarray, "... segment inside _"]
        Reshaped array, or zero array if dimension is smaller than `chunklen`.
    """
    n = P.shape[-2]
    if n >= chunklen:
        ii = reshape_overlap_index(chunklen, chunkoverlap, n)
        return P[..., np.array(ii), :]
    else:
        return np.zeros(P.shape[:-2] + (0, chunklen, P.shape[-1]), dtype=float)

reshape_halfoverlap_first(y, w)

Reshape first dimension into half-overlapping windows.

Parameters

y : Float[ndarray, "samples ..."] Input array. w : int Window length (even integer).

Returns

Float[ndarray, "segment inside ... "] Reshaped array with half-overlapping windows.

Source code in turban/utils/util.py
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
def reshape_halfoverlap_first(
    y: Float[ndarray, "samples ..."], w: int
) -> Float[ndarray, "segment inside ... "]:
    """Reshape first dimension into half-overlapping windows.

    Parameters
    ----------
    y : Float[ndarray, "samples ..."]
        Input array.
    w : int
        Window length (even integer).

    Returns
    -------
    Float[ndarray, "segment inside ... "]
        Reshaped array with half-overlapping windows.
    """
    assert w % 2 == 0  # function would work for uneven w but results may be unintuitive
    return y[reshape_overlap_index(w, w // 2, y.shape[0]), ...]

reshape_halfoverlap_last(y, w)

Reshape last dimension into half-overlapping windows.

Parameters

y : Float[ndarray, "... samples"] Input array. w : int Window length (even integer).

Returns

Float[ndarray, "... segment w"] Reshaped array with half-overlapping windows.

Source code in turban/utils/util.py
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
def reshape_halfoverlap_last(
    y: Float[ndarray, "... samples"], w: int
) -> Float[ndarray, "... segment w"]:
    """Reshape last dimension into half-overlapping windows.

    Parameters
    ----------
    y : Float[ndarray, "... samples"]
        Input array.
    w : int
        Window length (even integer).

    Returns
    -------
    Float[ndarray, "... segment w"]
        Reshaped array with half-overlapping windows.
    """
    assert w % 2 == 0  # function would work for uneven w but results may be unintuitive
    return y[..., reshape_overlap_index(w, w // 2, y.shape[-1])]

reshape_overlap_index(w, overlap, N)

Create index array for overlapping window segmentation.

Parameters

w : int Window length (positive integer). overlap : int Overlap length (positive integer), must be less than w. N : int Dimension length to expand (positive integer).

Returns

Int[ndarray, "segment w"] Index array of shape (segment, w) for overlapping windows.

Source code in turban/utils/util.py
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
def reshape_overlap_index(w: int, overlap: int, N: int) -> Int[ndarray, "segment w"]:
    """Create index array for overlapping window segmentation.

    Parameters
    ----------
    w : int
        Window length (positive integer).
    overlap : int
        Overlap length (positive integer), must be less than `w`.
    N : int
        Dimension length to expand (positive integer).

    Returns
    -------
    Int[ndarray, "segment w"]
        Index array of shape (segment, w) for overlapping windows.
    """
    assert w > overlap
    # assert N >= w
    stepsize = w - overlap  # increase for each window
    row_to_row_offset = np.arange(0, N - w + 1, stepsize)  # start of each window
    nrows = len(row_to_row_offset)
    ii = (
        np.zeros((nrows, w)) + np.arange(w)[newaxis, :] + row_to_row_offset[:, newaxis]
    ).astype(int)
    return ii

split_data(data, section_numbers)

Split array into segments based on section markers.

Parameters

data : Num[ndarray, "... time"] Data array to split. section_numbers : Int[ndarray, "... time"] Marker array specifying section membership. Zero markers are excluded.

Returns

dict[np.int_ | int, Num[ndarray, "... time"]] Dictionary mapping section marker to corresponding data segment. Marker 0 is not included.

Source code in turban/utils/util.py
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
def split_data(
    data: Num[ndarray, "... time"],
    section_numbers: Int[ndarray, "... time"],
) -> dict[np.int_ | int, Num[ndarray, "... time"]]:  # sections
    """Split array into segments based on section markers.

    Parameters
    ----------
    data : Num[ndarray, "... time"]
        Data array to split.
    section_numbers : Int[ndarray, "... time"]
        Marker array specifying section membership. Zero markers are excluded.

    Returns
    -------
    dict[np.int_ | int, Num[ndarray, "... time"]]
        Dictionary mapping section marker to corresponding data segment.
        Marker 0 is not included.
    """

    markers = set(section_numbers)
    # sections = select_sections(data_and_bounds)
    sections = {
        marker: data[..., section_numbers == marker]
        for marker in markers
        if marker != 0
    }
    return sections

unwrap_base2(q, maxq=None)

Decompose integers into base-2 bit flags.

Parameters

q : Int[ndarray, "*any"] Integer array to decompose. maxq : int, optional Maximum value to determine bit depth. If None, uses nanmax(q).

Returns

dict[int, Bool[ndarray, "*any"]] Dictionary mapping power-of-2 integers to boolean arrays indicating which bits are set in corresponding q elements.

Source code in turban/utils/util.py
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
def unwrap_base2(
    q: Int[ndarray, "*any"],
    maxq: int | None = None,
) -> dict[int, Bool[ndarray, "*any"]]:
    """Decompose integers into base-2 bit flags.

    Parameters
    ----------
    q : Int[ndarray, "*any"]
        Integer array to decompose.
    maxq : int, optional
        Maximum value to determine bit depth. If None, uses `nanmax(q)`.

    Returns
    -------
    dict[int, Bool[ndarray, "*any"]]
        Dictionary mapping power-of-2 integers to boolean arrays indicating
        which bits are set in corresponding `q` elements.
    """
    if maxq is None:
        maxq = np.nanmax(q)

    flag_arr: Bool[ndarray, "*any"] = np.unpackbits(
        q.astype(np.uint8)[np.newaxis], axis=0, bitorder="little"
    ).astype(bool)
    base = [2**i for i in range(int(np.log2(maxq) + 1))]
    flag_dict = {name: val for name, val in zip(base, flag_arr)}
    return flag_dict

Logging (utils/logging.py)

LoggerConfig

Configuration class for loggers

Parameters

handler : logging.Handler | None A handler to process debug messages. If None, a default handler is used. formatter : logging.Formatter | None A formatter to format debug messages. If None, a default formatter is used.

Source code in turban/utils/logging.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
class LoggerConfig:
    """Configuration class for loggers

    Parameters
    ----------
    handler : logging.Handler | None
        A handler to process debug messages. If None, a default handler is used.
    formatter : logging.Formatter | None
        A formatter to format debug messages. If None, a default formatter is used.
    """

    def __init__(
        self,
        handler: logging.Handler | None = None,
        formatter: logging.Formatter | None = None,
    ):
        self._handler = handler or logging.StreamHandler()
        self._formatter = formatter or logging.Formatter(
            "{asctime} | {levelname:<7s} | {name} | {funcName:<20s} | {filename:>10s}:{lineno:>4d} | {message}",
            "%Y-%m-%d %H:%M:%S",
            style="{",
        )

    def set_formatter(self, formatter: logging.Formatter) -> None:
        """Sets the formatter for debugging messages

        Parameters
        ----------
        formatter: logging.Formatter
            A formatter object defining how to format debug messages
        """
        self._formatter = formatter

    def set_handler(self, handler: logging.Handler) -> None:
        """Sets the handler

        Parameters
        ----------
        handler: logging.Handler
            A handler object defining how to output debug messages
        """
        self._handler = handler

    @property
    def handler(self) -> logging.Handler:
        """Returns the formatted handler

        Returns
        -------
        logging.Handler
            formatted handler object.
        """
        self._handler.setFormatter(self._formatter)
        return self._handler

handler property

Returns the formatted handler

Returns

logging.Handler formatted handler object.

set_formatter(formatter)

Sets the formatter for debugging messages

Parameters

formatter: logging.Formatter A formatter object defining how to format debug messages

Source code in turban/utils/logging.py
29
30
31
32
33
34
35
36
37
def set_formatter(self, formatter: logging.Formatter) -> None:
    """Sets the formatter for debugging messages

    Parameters
    ----------
    formatter: logging.Formatter
        A formatter object defining how to format debug messages
    """
    self._formatter = formatter

set_handler(handler)

Sets the handler

Parameters

handler: logging.Handler A handler object defining how to output debug messages

Source code in turban/utils/logging.py
39
40
41
42
43
44
45
46
47
def set_handler(self, handler: logging.Handler) -> None:
    """Sets the handler

    Parameters
    ----------
    handler: logging.Handler
        A handler object defining how to output debug messages
    """
    self._handler = handler

LoggerManager

Logger manager

A class to manage various loggers created by the application, as well as in imported modules. The class is implemented as a singleton, which means that multiple instantiations return in fact the same object.

For example:

lm = LoggerManager() LM = LoggerManager() lm is LM # -> True

The class LoggerManager can be called with a debug level (as a string), which sets the default level for all loggers. The behaviour of specific loggers can be modified by the method set_level(..., filter_pattern), using the filter_pattern (regular expression).

Parameters

default_level: str | None default debug level. This will override any default level set previously. If not specified, the singleton is return without modifying the default level.

Source code in turban/utils/logging.py
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
class LoggerManager:
    """Logger manager

    A class to manage various loggers created by the application, as
    well as in imported modules.  The class is implemented as a
    singleton, which means that multiple instantiations return in fact
    the same object.

    For example:

    >>> lm = LoggerManager()
    >>> LM = LoggerManager()
    >>> lm is LM # -> True

    The class LoggerManager can be called with a debug level (as a
    string), which sets the default level for all loggers. The
    behaviour of specific loggers can be modified by the method
    set_level(..., filter_pattern), using the filter_pattern (regular
    expression).

    Parameters
    ----------
    default_level: str | None
        default debug level. This will override any default level set
        previously. If not specified, the singleton is return without
        modifying the default level.

    """

    _instance: Self | None = None
    _loggers: dict[str, logging.Logger] = {}
    _levels: dict[str, int] = {
        "debug": logging.DEBUG,
        "info": logging.INFO,
        "warning": logging.WARNING,
        "error": logging.ERROR,
    }
    _log_levels: list[tuple[re.Pattern[str] | None, int]] = []

    def __new__(cls, default_level: str | None = None) -> Self:
        if cls._instance is None:
            cls._instance = super().__new__(cls)
        if not default_level is None:
            cls._instance.set_level(default_level)
        return cls._instance

    def get_logger(
        self, name: str, config: LoggerConfig | None = None
    ) -> logging.Logger:
        """Returns a logger created previously with given name, or,
        creates a new logger, if no such logger exists.

        Parameters
        ----------
        name : str
            name of logger (for example __name__)
        config: LoggerConfig | None
            Configuration object that specifies the handlers and
            formatters. If None, a default LoggerConfig is used.

        Returns
        -------
        logging.Logger
            A logger object.
        """
        try:
            logger = self._loggers[name]
        except KeyError:
            logger = logging.getLogger(name)
            config = config or LoggerConfig()
            logger.addHandler(config.handler)
            self._loggers[name] = logger
        else:
            if not config is None:
                logger.info(
                    f"Supplied configuration is not applied to an already configured logger."
                )
        # makes any new logger's level set to any requirements set already.
        for _log_level in self._log_levels:
            self._set_level(*_log_level, name, logger)
        return logger

    def _set_level(
        self,
        regex: re.Pattern[str] | None,
        level: int,
        logger_name: str,
        logger: logging.Logger,
    ) -> None:
        if regex:
            if regex.search(logger_name):
                logger.setLevel(level)
        else:
            logger.setLevel(level)

    def set_level(self, level: int | str, filter_pattern: str | None = None) -> None:
        """Set the level for all or some loggers

        Parameters
        ----------
        level: int | str
            debug level (for example logging.DEBUG or "debug")
        filter_pattern : str | None

            A regular expression to limit the setting to matching
            loggers. If None, all loggers' levels are set.
        """
        if isinstance(level, str):
            _level = self._levels[level.lower()]
        else:
            _level = level

        if not filter_pattern is None:
            regex = re.compile(filter_pattern)
        else:
            regex = None
        for k, v in self._loggers.items():
            self._set_level(regex, _level, k, v)
        self._log_levels.append((regex, _level))

    def list_loggers(self) -> list[str]:
        """Lists all loggers created under the auspicien of the LoggerManager

        Returns
        -------
        list[str]
            list of names of all loggers, under management of the LoggerManager

        See also list_all_loggers() for listing all loggers known in this session.
        """
        return [k for k, v in self._loggers.items()]

    def list_all_loggers(self) -> list[str]:
        """Lists all loggers created in this session

        Returns
        -------
        list[str]
            list of names of all loggers known in this session

        See also list_loggers() for listing all loggers managed by the LoggerManager.
        """
        loggers = [
            name
            for name, logger in logging.Logger.manager.loggerDict.items()
            if isinstance(logger, logging.Logger)
        ]
        return loggers

get_logger(name, config=None)

Returns a logger created previously with given name, or, creates a new logger, if no such logger exists.

Parameters

name : str name of logger (for example name) config: LoggerConfig | None Configuration object that specifies the handlers and formatters. If None, a default LoggerConfig is used.

Returns

logging.Logger A logger object.

Source code in turban/utils/logging.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
def get_logger(
    self, name: str, config: LoggerConfig | None = None
) -> logging.Logger:
    """Returns a logger created previously with given name, or,
    creates a new logger, if no such logger exists.

    Parameters
    ----------
    name : str
        name of logger (for example __name__)
    config: LoggerConfig | None
        Configuration object that specifies the handlers and
        formatters. If None, a default LoggerConfig is used.

    Returns
    -------
    logging.Logger
        A logger object.
    """
    try:
        logger = self._loggers[name]
    except KeyError:
        logger = logging.getLogger(name)
        config = config or LoggerConfig()
        logger.addHandler(config.handler)
        self._loggers[name] = logger
    else:
        if not config is None:
            logger.info(
                f"Supplied configuration is not applied to an already configured logger."
            )
    # makes any new logger's level set to any requirements set already.
    for _log_level in self._log_levels:
        self._set_level(*_log_level, name, logger)
    return logger

list_all_loggers()

Lists all loggers created in this session

Returns

list[str] list of names of all loggers known in this session

See also list_loggers() for listing all loggers managed by the LoggerManager.

Source code in turban/utils/logging.py
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
def list_all_loggers(self) -> list[str]:
    """Lists all loggers created in this session

    Returns
    -------
    list[str]
        list of names of all loggers known in this session

    See also list_loggers() for listing all loggers managed by the LoggerManager.
    """
    loggers = [
        name
        for name, logger in logging.Logger.manager.loggerDict.items()
        if isinstance(logger, logging.Logger)
    ]
    return loggers

list_loggers()

Lists all loggers created under the auspicien of the LoggerManager

Returns

list[str] list of names of all loggers, under management of the LoggerManager

See also list_all_loggers() for listing all loggers known in this session.

Source code in turban/utils/logging.py
182
183
184
185
186
187
188
189
190
191
192
def list_loggers(self) -> list[str]:
    """Lists all loggers created under the auspicien of the LoggerManager

    Returns
    -------
    list[str]
        list of names of all loggers, under management of the LoggerManager

    See also list_all_loggers() for listing all loggers known in this session.
    """
    return [k for k, v in self._loggers.items()]

set_level(level, filter_pattern=None)

Set the level for all or some loggers

Parameters

level: int | str debug level (for example logging.DEBUG or "debug") filter_pattern : str | None

A regular expression to limit the setting to matching
loggers. If None, all loggers' levels are set.
Source code in turban/utils/logging.py
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
def set_level(self, level: int | str, filter_pattern: str | None = None) -> None:
    """Set the level for all or some loggers

    Parameters
    ----------
    level: int | str
        debug level (for example logging.DEBUG or "debug")
    filter_pattern : str | None

        A regular expression to limit the setting to matching
        loggers. If None, all loggers' levels are set.
    """
    if isinstance(level, str):
        _level = self._levels[level.lower()]
    else:
        _level = level

    if not filter_pattern is None:
        regex = re.compile(filter_pattern)
    else:
        regex = None
    for k, v in self._loggers.items():
        self._set_level(regex, _level, k, v)
    self._log_levels.append((regex, _level))

get_logger(name)

Get or create a named logger with the configured stream handler.

If the logger has no handlers, the pre-configured handler with custom formatter is attached automatically.

Parameters

name : str The name of the logger to retrieve or create.

Returns

logging.Logger The logger instance with the configured handler attached if needed.

Source code in turban/utils/logging.py
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
def get_logger(name: str) -> logging.Logger:
    """Get or create a named logger with the configured stream handler.

    If the logger has no handlers, the pre-configured handler with custom
    formatter is attached automatically.

    Parameters
    ----------
    name : str
        The name of the logger to retrieve or create.

    Returns
    -------
    logging.Logger
        The logger instance with the configured handler attached if needed.

    """
    manager = LoggerManager()
    logger = manager.get_logger(name)
    return logger

set_turban_loglevel(level, pattern='turban')

Set the log level for all loggers matching a name pattern.

Parameters

level : int or str The log level to set. Can be a numeric level (e.g. logging.INFO or 20) or a string (e.g. "INFO"). pattern : str, optional Logger name prefix to match. Only loggers whose names start with this pattern will be affected. Default is "turban".

Source code in turban/utils/logging.py
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
def set_turban_loglevel(level: int | str, pattern: str = "turban") -> None:
    """Set the log level for all loggers matching a name pattern.

    Parameters
    ----------
    level : int or str
        The log level to set. Can be a numeric level (e.g. logging.INFO or 20)
        or a string (e.g. "INFO").
    pattern : str, optional
        Logger name prefix to match. Only loggers whose names start with this
        pattern will be affected. Default is "turban".

    """
    manager = LoggerManager()
    regex_pattern = rf"^{pattern}.*"
    manager.set_level(level, regex_pattern)

Spectral estimation (utils/spectra.py)

apply_var_conserve(psi, dfreq, signal_reshape)

Apply variance conservation correction to power spectral density.

Scales psi in-place such that its integral along the frequency axis equals the variance of the input signal.

Parameters

psi : ndarray, shape (*any, freq) Power spectral density. Modified in-place. dfreq : float Frequency resolution. signal_reshape : ndarray, shape (nshear, chunks, freq) Reshaped signal used for variance calculation.

Returns

ndarray, shape (*any,) Correction factors applied to psi.

Source code in turban/utils/spectra.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def apply_var_conserve(
    psi: Float[ndarray, "*any freq"],
    dfreq: float,
    signal_reshape: Float[ndarray, "nshear chunks freq"],
) -> Float[ndarray, "*any"]:
    """Apply variance conservation correction to power spectral density.

    Scales `psi` in-place such that its integral along the frequency axis equals
    the variance of the input signal.

    Parameters
    ----------
    psi : ndarray, shape (*any, freq)
        Power spectral density. Modified in-place.
    dfreq : float
        Frequency resolution.
    signal_reshape : ndarray, shape (nshear, chunks, freq)
        Reshaped signal used for variance calculation.

    Returns
    -------
    ndarray, shape (*any,)
        Correction factors applied to psi.
    """
    intpsi = psi[..., 1:].sum(axis=-1) * dfreq  # disregard first frequency
    signal_reshape = np.ascontiguousarray(
        signal_reshape
    )  # performance enhancement for np.var
    corr_factor = np.var(signal_reshape, axis=-1) / intpsi
    psi *= corr_factor[..., newaxis]
    return corr_factor

spectrum(x, sampfreq, segment_length=None, segment_overlap=None, chunk_length=None, chunk_overlap=None, section_number=None, reshape_index=None, y=None, **estimator_kwarg)

Compute power spectral density or cross spectral density from time series.

Parameters

x : ndarray, shape (... time_fast) Input time series. sampfreq : float Sampling frequency in Hz. segment_length : int, optional FFT segment length in samples. segment_overlap : int, optional FFT segment overlap in samples. chunk_length : int, optional Dissipation chunk length. chunk_overlap : int, optional Dissipation chunk overlap. section_number : ndarray of int, shape (time_fast,), optional Section markers. reshape_index : ndarray of int, shape (diss_chunk, fft_chunk, segment_length), optional Precomputed reshaping index. If not provided, is calculated from other parameters. y : ndarray, shape (... time_fast), optional If provided, compute cross spectral density instead of PSD. **estimator_kwarg Additional keyword arguments passed to welch or csd.

Returns

psi : ndarray, shape (... chunk freq) Power spectral density (PSD) or cross spectral density (CSD). freq : ndarray, shape (freq,) Frequency array.

Source code in turban/utils/spectra.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
def spectrum(
    x: Float[ndarray, "... time_fast"],
    sampfreq: float,
    segment_length: int | None = None,
    segment_overlap: int | None = None,
    chunk_length: int | None = None,
    chunk_overlap: int | None = None,
    section_number: Int[ndarray, "time_fast"] | None = None,
    reshape_index: Int[ndarray, "diss_chunk fft_chunk segment_length"] | None = None,
    y: (
        Float[ndarray, "... time_fast"] | None
    ) = None,  # if not None, return cross spectral density
    **estimator_kwarg,
) -> tuple[
    Num[ndarray, "... chunk freq"],
    Float[ndarray, "freq"],  # frequencies
]:
    """Compute power spectral density or cross spectral density from time series.

    Parameters
    ----------
    x : ndarray, shape (... time_fast)
        Input time series.
    sampfreq : float
        Sampling frequency in Hz.
    segment_length : int, optional
        FFT segment length in samples.
    segment_overlap : int, optional
        FFT segment overlap in samples.
    chunk_length : int, optional
        Dissipation chunk length.
    chunk_overlap : int, optional
        Dissipation chunk overlap.
    section_number : ndarray of int, shape (time_fast,), optional
        Section markers.
    reshape_index : ndarray of int, shape (diss_chunk, fft_chunk, segment_length), optional
        Precomputed reshaping index. If not provided, is calculated from other parameters.
    y : ndarray, shape (... time_fast), optional
        If provided, compute cross spectral density instead of PSD.
    **estimator_kwarg
        Additional keyword arguments passed to welch or csd.

    Returns
    -------
    psi : ndarray, shape (... chunk freq)
        Power spectral density (PSD) or cross spectral density (CSD).
    freq : ndarray, shape (freq,)
        Frequency array.
    """
    if reshape_index is None:
        section_number = cast(Int[ndarray, "time_fast"], section_number)
        chunk_length = cast(int, chunk_length)
        chunk_overlap = cast(int, chunk_overlap)
        reshape_index = get_chunking_index(
            section_number,
            (chunk_length, chunk_overlap),
        )
    else:
        segment_length = reshape_index.shape[-1]

    kwarg = dict(
        axis=-1,
        fs=sampfreq,
        nperseg=segment_length,
        noverlap=segment_overlap,
        scaling="density",
    )

    xr = x[..., reshape_index]  # reshape to chunk  length windows

    if y is None:
        freq, psi = welch(xr, **kwarg, **estimator_kwarg)
    else:
        yr = y[..., reshape_index]
        freq, psi = csd(xr, yr, **kwarg, **estimator_kwarg)

    dfreq = freq[1] - freq[0]
    # by using scaling 'density' above, variance should already be approximately conserved,
    # but now we correct for PSD estimator bias manually:
    _ = apply_var_conserve(psi, dfreq, xr)

    return psi, freq

CTD utilities (utils/ctd.py)

calc_ctd(C, T, P, lon, lat)

Parameters

C : conductivity (mS/cm)
T : in situ temperature (deg C, ITS-90)
P : Pressure (dbar)
lon, lat: degrees East/North

Returns

sigma0 : potential density anomaly (kg/m3)
SA : Absolute salinity, g/kg
CT : conservative temperature (deg C)
Source code in turban/utils/ctd.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def calc_ctd(
    C: Float[ndarray, "time"],
    T: Float[ndarray, "time"],
    P: Float[ndarray, "time"],
    lon: float,
    lat: float,
) -> tuple[
    Float[ndarray, "time"],
    Float[ndarray, "time"],
    Float[ndarray, "time"],
]:
    """
    Parameters
    ----------
        C : conductivity (mS/cm)
        T : in situ temperature (deg C, ITS-90)
        P : Pressure (dbar)
        lon, lat: degrees East/North

    Returns
    -------
        sigma0 : potential density anomaly (kg/m3)
        SA : Absolute salinity, g/kg
        CT : conservative temperature (deg C)
    """
    SP = gsw.SP_from_C(C, T, P)
    SA = gsw.SA_from_SP(SP, P, lon, lat)
    CT = gsw.CT_from_t(SA, T, P)
    sigma0 = gsw.sigma0(SA, CT)
    return (sigma0, SA, CT)

fofonoff_filt(x, tau)

Correct time series for lagged sensor response. For MSS temperature, an OK default value is 55 but this should be determined individually

Source code in turban/utils/ctd.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
def fofonoff_filt(x: Float[ndarray, "time"], tau: int) -> Float[ndarray, "time"]:
    """Correct time series for lagged sensor response. For MSS temperature, an
    OK default value is 55 but this should be determined individually"""
    w = 200
    w2 = w // 2
    # pad with nans
    y: Float[ndarray, "agg window"] = reshape_halfoverlap_last(x, w)
    # dummy time vector in seconds
    samples = np.arange(w)
    t: Int[ndarray, "agg2"] = np.arange(0, len(x) + 1, w2)

    dxdt_agg: Float[ndarray, "agg2"] = np.r_[
        np.polyfit(x=samples[:w2], y=y[0, :w2], deg=1)[0],  # first bit
        np.polyfit(x=samples, y=y.transpose(), deg=1)[0, :],
        np.polyfit(x=samples[:w2], y=y[-1, -w2:], deg=1)[0],  # last bit
    ]
    dxdt: Float[ndarray, "time"] = np.interp(np.arange(len(x)), t, dxdt_agg)
    return (dxdt * tau) + x

File paths and data download (utils/filepaths.py)

FilePaths

FilePaths

A class for managing downloading data files for test purposes from a remote server.

Intended use:

filepaths = FilePaths() atomix_benchmark_baltic_fpath = filepaths.add("data/process/shear/MSS_Baltic.nc")

and more paths you want to speficy

filepaths.download_data_if_necessary()

Source code in turban/utils/filepaths.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
class FilePaths:
    """FilePaths

    A class for managing downloading data files for test purposes from a remote server.

    Intended use:

    >>> filepaths = FilePaths()
    >>> atomix_benchmark_baltic_fpath = filepaths.add("data/process/shear/MSS_Baltic.nc")
    >>> # and more paths you want to speficy
    >>> filepaths.download_data_if_necessary()

    """

    def __init__(self) -> None:
        self.filepaths: list[str] = []
        self.top_level: Path = Path(__file__).resolve().parent.parent.parent
        self.url: str = ""
        self._is_data_downloaded = False

    def add(self, path: str | Path) -> str:
        """Adds given string or Path object to the registry.

        Parameters
        ----------
        path : str or Path
            Name of a path to add to the registry

        Returns: str
            Path name as a string
        """
        file_path = self.top_level / path
        self.filepaths.append(file_path)
        return str(file_path)

    def auto_download_data_if_necessary(self) -> None:
        '''Download files if one or more data are missing, if and only if, the environment
           variable TURBAN_AUTO_DOWNLOAD_TEST_FILES is set to 1.
        '''
        flag = os.getenv(TURBAN_AUTO_DOWNLOAD_TEST_FILES)
        if not flag is None and flag.strip()=='1':
            logger.info(f"Environment variable TURBAN_AUTO_DOWNLOAD_TEST_FILES is set. Checking need for download.")
            self.download_data_if_necessary()
        else:
            logger.info(f"Environment variable TURBAN_AUTO_DOWNLOAD_TEST_FILES is not set. Not downloading automatically.")

    def is_download_required(self) -> bool:
        download_required = False
        for p in self.filepaths:
            if not p.exists():
                logger.info(f"File {str(p)} is missing.")
                download_required = True
            else:
                logger.info(f"File {str(p)} is present.")
        return download_required

    def download_data_if_necessary(self) -> None:
        """Download if one or more data files are missing

        This method checks if all required files are existing on the current system,
        and in case one or more are failing, the data files are retrieved from a central
        public data file server.

        Data download, and extraction are done in a temporary directoy, before being moved
        to the expected location.
        """
        logger.debug(f"is already downloaded: {self._is_data_downloaded}")
        if self._is_data_downloaded:
            return
        if self.is_download_required():
            self.download_data()
            self._is_data_downloaded = True # if downloaded this session, we will not do so again.

    def download_data(self) -> None:
        """Downloads test and benchmark data files from external server"""
        logger.info("Downloading...")
        url = self.url or DATA_DOWNLOAD_LINK
        with tempfile.TemporaryDirectory(prefix="turban_", dir=None) as tmpdir:
            dest_dir = Path(tmpdir)
            zip_file = self.download_as_zip(url, dest_dir)
            logger.debug(f"Downloaded {zip_file}.")
            self.safe_extract_zip(zip_file, dest_dir)
            copytree(dest_dir / ARCHIVE_NAME, self.top_level)
        logger.info("Download completed.")

    def download_as_zip(self, url: str, dest_dir: str | Path, timeout: int = 30) -> str:
        """Download URL and write as a zip file

        Parameters
        ----------
        url : str
            url for remote site
        dest_dir : str | Path
            path where to download the zip file

        Returns
        -------
        str:
            name of zip file
        """
        headers = {"User-Agent": "curl/7.88.1"}
        os.makedirs(dest_dir, exist_ok=True)
        local_name = str(Path(dest_dir) / "data.zip")
        with requests.get(url, headers=headers, stream=True, timeout=timeout) as r:
            r.raise_for_status()
            total = int(r.headers.get("content-length", 0))
            tmp = local_name + ".part"
            with open(tmp, "wb") as f:
                for chunk in r.iter_content(chunk_size=8192):
                    if chunk:
                        f.write(chunk)
            os.replace(tmp, local_name)
        return local_name

    def safe_extract_zip(self, zip_path, target_dir) -> None:
        """
        Extract zip_path into target_dir while preventing path traversal attacks. (AI generated method)

        Parameters
        ----------
        zip_path : str | Path
            path to zip file to be extracted
        target_dir : str | Path
            path to directory into which the zip file should be extracted.
        """
        zip_path = Path(zip_path)
        target_dir = Path(target_dir)
        target_dir.mkdir(parents=True, exist_ok=True)
        with ZipFile(zip_path, "r") as z:
            for member in z.namelist():
                member_path = target_dir / member
                # Prevent path traversal: ensure the resolved path is inside target_dir
                if not str(member_path.resolve()).startswith(
                    str(target_dir.resolve()) + os.sep
                ):
                    raise RuntimeError(f"Unsafe zip entry detected: {member}")
                z.extractall(path=target_dir)

add(path)

Adds given string or Path object to the registry.

Parameters

path : str or Path Name of a path to add to the registry

str

Type Description
str

Path name as a string

Source code in turban/utils/filepaths.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def add(self, path: str | Path) -> str:
    """Adds given string or Path object to the registry.

    Parameters
    ----------
    path : str or Path
        Name of a path to add to the registry

    Returns: str
        Path name as a string
    """
    file_path = self.top_level / path
    self.filepaths.append(file_path)
    return str(file_path)

auto_download_data_if_necessary()

Download files if one or more data are missing, if and only if, the environment variable TURBAN_AUTO_DOWNLOAD_TEST_FILES is set to 1.

Source code in turban/utils/filepaths.py
69
70
71
72
73
74
75
76
77
78
def auto_download_data_if_necessary(self) -> None:
    '''Download files if one or more data are missing, if and only if, the environment
       variable TURBAN_AUTO_DOWNLOAD_TEST_FILES is set to 1.
    '''
    flag = os.getenv(TURBAN_AUTO_DOWNLOAD_TEST_FILES)
    if not flag is None and flag.strip()=='1':
        logger.info(f"Environment variable TURBAN_AUTO_DOWNLOAD_TEST_FILES is set. Checking need for download.")
        self.download_data_if_necessary()
    else:
        logger.info(f"Environment variable TURBAN_AUTO_DOWNLOAD_TEST_FILES is not set. Not downloading automatically.")

download_as_zip(url, dest_dir, timeout=30)

Download URL and write as a zip file

Parameters

url : str url for remote site dest_dir : str | Path path where to download the zip file

Returns

str: name of zip file

Source code in turban/utils/filepaths.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
def download_as_zip(self, url: str, dest_dir: str | Path, timeout: int = 30) -> str:
    """Download URL and write as a zip file

    Parameters
    ----------
    url : str
        url for remote site
    dest_dir : str | Path
        path where to download the zip file

    Returns
    -------
    str:
        name of zip file
    """
    headers = {"User-Agent": "curl/7.88.1"}
    os.makedirs(dest_dir, exist_ok=True)
    local_name = str(Path(dest_dir) / "data.zip")
    with requests.get(url, headers=headers, stream=True, timeout=timeout) as r:
        r.raise_for_status()
        total = int(r.headers.get("content-length", 0))
        tmp = local_name + ".part"
        with open(tmp, "wb") as f:
            for chunk in r.iter_content(chunk_size=8192):
                if chunk:
                    f.write(chunk)
        os.replace(tmp, local_name)
    return local_name

download_data()

Downloads test and benchmark data files from external server

Source code in turban/utils/filepaths.py
107
108
109
110
111
112
113
114
115
116
117
def download_data(self) -> None:
    """Downloads test and benchmark data files from external server"""
    logger.info("Downloading...")
    url = self.url or DATA_DOWNLOAD_LINK
    with tempfile.TemporaryDirectory(prefix="turban_", dir=None) as tmpdir:
        dest_dir = Path(tmpdir)
        zip_file = self.download_as_zip(url, dest_dir)
        logger.debug(f"Downloaded {zip_file}.")
        self.safe_extract_zip(zip_file, dest_dir)
        copytree(dest_dir / ARCHIVE_NAME, self.top_level)
    logger.info("Download completed.")

download_data_if_necessary()

Download if one or more data files are missing

This method checks if all required files are existing on the current system, and in case one or more are failing, the data files are retrieved from a central public data file server.

Data download, and extraction are done in a temporary directoy, before being moved to the expected location.

Source code in turban/utils/filepaths.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def download_data_if_necessary(self) -> None:
    """Download if one or more data files are missing

    This method checks if all required files are existing on the current system,
    and in case one or more are failing, the data files are retrieved from a central
    public data file server.

    Data download, and extraction are done in a temporary directoy, before being moved
    to the expected location.
    """
    logger.debug(f"is already downloaded: {self._is_data_downloaded}")
    if self._is_data_downloaded:
        return
    if self.is_download_required():
        self.download_data()
        self._is_data_downloaded = True # if downloaded this session, we will not do so again.

safe_extract_zip(zip_path, target_dir)

Extract zip_path into target_dir while preventing path traversal attacks. (AI generated method)

Parameters

zip_path : str | Path path to zip file to be extracted target_dir : str | Path path to directory into which the zip file should be extracted.

Source code in turban/utils/filepaths.py
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
def safe_extract_zip(self, zip_path, target_dir) -> None:
    """
    Extract zip_path into target_dir while preventing path traversal attacks. (AI generated method)

    Parameters
    ----------
    zip_path : str | Path
        path to zip file to be extracted
    target_dir : str | Path
        path to directory into which the zip file should be extracted.
    """
    zip_path = Path(zip_path)
    target_dir = Path(target_dir)
    target_dir.mkdir(parents=True, exist_ok=True)
    with ZipFile(zip_path, "r") as z:
        for member in z.namelist():
            member_path = target_dir / member
            # Prevent path traversal: ensure the resolved path is inside target_dir
            if not str(member_path.resolve()).startswith(
                str(target_dir.resolve()) + os.sep
            ):
                raise RuntimeError(f"Unsafe zip entry detected: {member}")
            z.extractall(path=target_dir)

Plot generic helpers (utils/plot/generic.py)

plot_quality_metric(ax, time, q, q_codes=None, **kwarg)

Plot integer quality flags as a pseudocolor mesh with a flag axis.

Parameters

ax : mpl.axes.Axes Matplotlib axes to plot on. time : ndarray, shape (any, time) Time coordinates for the x-axis. q : ndarray of int, shape (any, time) Quality flags as integers; each bit represents a flag condition. q_codes : dict, optional Mapping from bit-flag values (powers of 2) to human-readable names. If None, flags are labeled by their numeric value. **kwarg Additional keyword arguments passed to unwrap_base2.

Source code in turban/utils/plot/generic.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
def plot_quality_metric(
    ax,
    time: Shaped[ndarray, "*any time"],
    q: Int[ndarray, "*any time"],
    q_codes: dict | None = None,
    **kwarg,
):
    """Plot integer quality flags as a pseudocolor mesh with a flag axis.

    Parameters
    ----------
    ax : mpl.axes.Axes
        Matplotlib axes to plot on.
    time : ndarray, shape (*any, time)
        Time coordinates for the x-axis.
    q : ndarray of int, shape (*any, time)
        Quality flags as integers; each bit represents a flag condition.
    q_codes : dict, optional
        Mapping from bit-flag values (powers of 2) to human-readable names.
        If None, flags are labeled by their numeric value.
    **kwarg
        Additional keyword arguments passed to ``unwrap_base2``.
    """
    green_red_cmap = LinearSegmentedColormap.from_list("GreenRedCmap", ["green", "red"])

    flag_dict = unwrap_base2(q, **kwarg)

    q = np.stack(list(flag_dict.values()), axis=0)
    if q_codes is None:
        flag = list(flag_dict.keys())
    else:
        flag = [q_codes[k] for k in flag_dict.keys()]

    ax.pcolormesh(time, range(len(flag)), q, cmap=green_red_cmap)
    ax.set_yticks(range(len(flag)), flag)

plot_section_numbers(axs, time, section_num)

Plot contiguous non-zero sections as vertical spans on multiple axes.

Parameters

axs : Iterable[mpl.axes.Axes] Collection of matplotlib axes to draw on. time : ndarray, shape (n,) Time coordinates for the x-axis. section_num : ndarray of int, shape (n,) Section markers; non-zero values indicate sections to highlight.

Source code in turban/utils/plot/generic.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def plot_section_numbers(
    axs: Iterable[mpl.axes.Axes],
    time: Shaped[np.ndarray, "n"],
    section_num: Int[np.ndarray, "n"],
):
    """Plot contiguous non-zero sections as vertical spans on multiple axes.

    Parameters
    ----------
    axs : Iterable[mpl.axes.Axes]
        Collection of matplotlib axes to draw on.
    time : ndarray, shape (n,)
        Time coordinates for the x-axis.
    section_num : ndarray of int, shape (n,)
        Section markers; non-zero values indicate sections to highlight.
    """
    # Plot section_number as solid bars where non-zero
    non_zero_mask = section_num != 0

    # Find continuous sections
    changes = np.diff(np.concatenate(([False], non_zero_mask, [False])).astype(int))
    starts = np.where(changes == 1)[0]
    ends = np.where(changes == -1)[0]

    for ax in axs:
        for start, end in zip(starts, ends):
            ax.axvspan(
                time[start],
                time[end - 1],
                alpha=0.1,
                color="green",
            )

Plot shear processing (utils/plot/shear.py)

plot(*data, subset=None)

Make all possible plots from any number of supplied data.

Source code in turban/utils/plot/shear.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
def plot(*data: Any, subset: SubsetSpec | None = None):
    """Make all possible plots from any number of supplied data."""
    plot_map = {
        1: plot_level1,
        2: plot_level2,
        3: plot_level3,
        4: plot_level4,
    }
    level_data_items = [level for item in data for level in _to_levels(item)]
    level_data_levels = [item._level for item in level_data_items]

    if len(set(level_data_levels)) < len(level_data_levels):
        raise ValueError("Some levels occur more than once; do not know what to do")

    out = []
    for level, plotfunc in plot_map.items():
        if level in level_data_levels:
            out.append(plotfunc(*level_data_items, subset=subset))

    return out

plot_level1(*data, subset=None)

Plot Level 1 data with shear and senspeed panels.

Parameters

*data : Any ShearLevel1, ShearProcessing, xr.Dataset, or xr.DataTree objects. subset : list of tuple, optional List of (variable, vmin, vmax) bounds for subsetting. If None, no subsetting is applied.

Returns

tuple of (matplotlib.figure.Figure, ndarray of Axes) Figure with panels for shear (one line per sensor), senspeed, and any remaining data variables.

Source code in turban/utils/plot/shear.py
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
def plot_level1(*data: Any, subset: SubsetSpec | None = None):
    """Plot Level 1 data with shear and senspeed panels.

    Parameters
    ----------
    *data : Any
        ShearLevel1, ShearProcessing, xr.Dataset, or xr.DataTree objects.
    subset : list of tuple, optional
        List of ``(variable, vmin, vmax)`` bounds for subsetting. If None, no
        subsetting is applied.

    Returns
    -------
    tuple of (matplotlib.figure.Figure, ndarray of Axes)
        Figure with panels for shear (one line per sensor), senspeed, and any
        remaining data variables.
    """
    levels = _parse_level_inputs(*data)

    ds = _clip(_to_dataset(levels.get(1)), subset)

    data_vars = set(ds.data_vars) - {"section_number", "shear", "senspeed"}
    n_panels = len(data_vars)

    fig, axs = plt.subplots(n_panels + 2, 1, figsize=(12, 8), sharex=True)

    # Panel 1: Shear data
    ax = axs[0]
    for i in ds["nshear"]:
        ds.isel(nshear=i).shear.plot(ax=ax, x="time")
    ax.set_ylabel("Shear")
    ax.set_xlabel("")
    ax.legend([f"Shear {i+1:1d}" for i in ds["nshear"]])

    # Panel 2: Senspeed
    ax = axs[1]
    ds["senspeed"].plot(ax=ax, x="time", color="k", linewidth=1)
    ax.set_ylabel("Senspeed")
    ax.set_xlabel("Time")

    # Remaining panels
    for ax, dvar in zip(axs[2:], data_vars):
        ds[dvar].plot(ax=ax, x="time", color="k", linewidth=1)

    for ax in axs:
        ax.grid(True, alpha=0.3)

    plot_section_numbers(list(axs), ds.time.values, ds.section_number.values)

    fig.suptitle(f"Level 1{_subset_suffix(subset)}")
    plt.tight_layout(rect=(0, 0, 1, 0.96))
    return fig, axs

plot_level2(*data, subset=None)

Plot Level 2 data with cleaned shear and despike iteration counts.

Parameters

*data : Any ShearLevel2 required; ShearLevel1 optional (for overlay comparison). Also accepts ShearProcessing, xr.Dataset, or xr.DataTree objects. subset : list of tuple, optional List of (variable, vmin, vmax) bounds for subsetting. If None, no subsetting is applied.

Returns

tuple of (matplotlib.figure.Figure, ndarray of Axes) Figure with 2×nshear panels: cleaned vs raw shear overlay per sensor, and despike iteration count per sensor.

Source code in turban/utils/plot/shear.py
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
def plot_level2(*data: Any, subset: SubsetSpec | None = None):
    """Plot Level 2 data with cleaned shear and despike iteration counts.

    Parameters
    ----------
    *data : Any
        ShearLevel2 required; ShearLevel1 optional (for overlay comparison).
        Also accepts ShearProcessing, xr.Dataset, or xr.DataTree objects.
    subset : list of tuple, optional
        List of ``(variable, vmin, vmax)`` bounds for subsetting. If None, no
        subsetting is applied.

    Returns
    -------
    tuple of (matplotlib.figure.Figure, ndarray of Axes)
        Figure with 2×nshear panels: cleaned vs raw shear overlay per sensor,
        and despike iteration count per sensor.
    """
    levels = _parse_level_inputs(*data)

    ds = _clip(_to_dataset(levels.get(2)), subset)
    valid_section = ds["section_number"] != 0
    data_l1 = levels.get(1, None)
    ds1 = None
    if data_l1 is not None:
        ds1 = _clip(_to_dataset(data_l1), subset)
        valid_section_l1 = ds1["section_number"] != 0
    nshear = len(ds.nshear)

    fig, axs = plt.subplots(nshear * 2, 1, figsize=(12, 4 + nshear * 2), sharex=True)

    for i, (ax1, ax2) in enumerate(zip(axs[::2], axs[1::2])):
        # Panel 1: Shear data
        ax = ax1
        if ds1 is not None:
            ds1.isel(nshear=i).shear.where(valid_section_l1).plot(
                ax=ax, x="time", c="k"
            )
        ds.isel(nshear=i).shear.where(valid_section).plot(ax=ax, x="time", c="r")
        ax.set_title(f"Shear {i+1:1d}")

        # Panel 2:
        ax = ax2
        ds["num_despike_iter"].isel(nshear=i).where(valid_section).plot(
            ax=ax, x="time", color="k", linewidth=1
        )
        ax.set_xlabel("Time")
        ax.set_title(f"Shear {i+1:1d}")

    for ax in axs:
        ax.grid(True, alpha=0.3)

    plot_section_numbers(axs, ds.time.values, ds.section_number.values)

    fig.suptitle(f"Level 2{_subset_suffix(subset)}")
    plt.tight_layout(rect=(0, 0, 1, 0.96))
    return fig, axs

plot_level3(*data, subset=None)

Plot shear spectra and time series with optional model spectrum overlay.

Parameters

*data : Any ShearLevel3 required; ShearLevel4 optional (for model spectrum overlay). Also accepts ShearProcessing, xr.Dataset, or xr.DataTree objects. subset : list of tuple, optional List of (variable, vmin, vmax) bounds for subsetting. If None, no subsetting is applied.

Returns

tuple of (matplotlib.figure.Figure, list of Axes) Figure with shear spectra panels (one per sensor) and time series panels for senspeed and auxiliary variables. Model Nasmyth spectra are overlaid if Level 4 data is provided.

Source code in turban/utils/plot/shear.py
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
def plot_level3(*data: Any, subset: SubsetSpec | None = None):
    """Plot shear spectra and time series with optional model spectrum overlay.

    Parameters
    ----------
    *data : Any
        ShearLevel3 required; ShearLevel4 optional (for model spectrum overlay).
        Also accepts ShearProcessing, xr.Dataset, or xr.DataTree objects.
    subset : list of tuple, optional
        List of ``(variable, vmin, vmax)`` bounds for subsetting. If None, no
        subsetting is applied.

    Returns
    -------
    tuple of (matplotlib.figure.Figure, list of Axes)
        Figure with shear spectra panels (one per sensor) and time series panels
        for senspeed and auxiliary variables. Model Nasmyth spectra are overlaid
        if Level 4 data is provided.
    """
    levels = _parse_level_inputs(*data)
    data_l3 = levels.get(3)
    data_l4 = levels.get(4, None)

    if data_l3 is None:
        raise ValueError("plot_level3 requires Level 3 data")

    ds3 = _clip(_to_dataset(data_l3), subset)
    ds4 = _clip(_to_dataset(data_l4), subset) if data_l4 is not None else None
    nshear = len(ds3.nshear)

    aux_keys = set((getattr(data_l3, "_aux_data", {}) or {}).keys())
    aux_vars = [
        var for var in ds3.data_vars if var in aux_keys and "time" in ds3[var].dims
    ]
    ts_vars = ["senspeed", *aux_vars]

    n_ts = len(ts_vars)
    fig = plt.figure(figsize=(4 * nshear, 3 + 2.2 * (1 + n_ts)))
    gs = fig.add_gridspec(nrows=1 + n_ts, ncols=nshear)

    spectra_axes = [fig.add_subplot(gs[0, i]) for i in range(nshear)]
    ts_axes = [fig.add_subplot(gs[1 + i, :]) for i in range(n_ts)]

    for i, ax in enumerate(spectra_axes):
        # Plot Pk vs wavenumber with time overlaid
        ds_sensor = ds3.isel(nshear=i)
        ds_sensor["psi_k_sh"].plot(ax=ax, x="waveno", hue="time", add_legend=False)

        if ds4 is not None:
            waveno_sensor = np.asarray(ds_sensor["waveno"].values)
            waveno_pos = waveno_sensor[np.isfinite(waveno_sensor) & (waveno_sensor > 0)]
            if waveno_pos.size > 0:
                waveno = np.logspace(
                    np.log10(np.nanmin(waveno_pos)),
                    np.log10(np.nanmax(waveno_pos)),
                    200,
                )
                eps = ds4["eps"].isel(nshear=i).values
                kolm_length = ds4["kolm_length"].isel(nshear=i).values
                molvisc = (eps * kolm_length**4) ** (1 / 3)
                psi = model_spectrum(waveno, eps, molvisc)

                ax.plot(waveno, psi.T, color="k", alpha=0.3, linewidth=2, ls="-")

        ax.set_xscale("log")
        ax.set_ylim(np.nanpercentile(ds3["psi_k_sh"], 0.1), None)
        ax.set_yscale("log")
        ax.set_title(f"Shear {i+1}")
        ax.grid(True, alpha=0.3, which="both")

    # add sensor speed and auxiliary variables
    for ax, var in zip(ts_axes, ts_vars):
        ds3[var].plot(ax=ax, color="k", linewidth=1)
        ax.grid(True, alpha=0.3)

    ts_axes[-1].set_xlabel("Time")
    plot_section_numbers(ts_axes, ds3.time.values, ds3.section_number.values)

    axs = [*spectra_axes, *ts_axes]

    fig.suptitle(f"Level 3{_subset_suffix(subset)}")
    plt.tight_layout(rect=(0, 0, 1, 0.96))
    return fig, axs

plot_level4(*data, subset=None)

Plot turbulent dissipation rate time series and quality metrics.

Parameters

*data : Any ShearLevel4 required. Also accepts ShearProcessing, xr.Dataset, or xr.DataTree objects. subset : list of tuple, optional List of (variable, vmin, vmax) bounds for subsetting. If None, no subsetting is applied.

Returns

tuple of (matplotlib.figure.Figure, list of Axes) Figure with eps time series panel and quality metric panels for each sensor.

Source code in turban/utils/plot/shear.py
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
def plot_level4(*data: Any, subset: SubsetSpec | None = None):
    """Plot turbulent dissipation rate time series and quality metrics.

    Parameters
    ----------
    *data : Any
        ShearLevel4 required. Also accepts ShearProcessing, xr.Dataset, or
        xr.DataTree objects.
    subset : list of tuple, optional
        List of ``(variable, vmin, vmax)`` bounds for subsetting. If None, no
        subsetting is applied.

    Returns
    -------
    tuple of (matplotlib.figure.Figure, list of Axes)
        Figure with eps time series panel and quality metric panels for each
        sensor.
    """
    levels = _parse_level_inputs(*data)

    ds = _clip(_to_dataset(levels.get(4)), subset)
    nshear = len(ds.nshear)

    # Create figure with panels for eps and quality metrics
    fig, axs = plt.subplots(nshear + 1, 1, figsize=(12, 3 + 2 * nshear), sharex=True)
    if nshear == 0:
        axs = [axs]

    # Panel 1: Eps time series for all sensors
    ax = axs[0]
    for i in range(nshear):
        ds.eps.isel(nshear=i).plot(ax=ax, x="time", label=f"Shear {i+1}")

    ax.set_yscale("log")
    ax.legend()
    ax.grid(True, alpha=0.3)
    ax.set_title("Turbulent Dissipation Rate")

    # Remaining panels: Quality metrics for each sensor
    for i in range(nshear):
        ax = axs[i + 1]
        plot_quality_metric(
            ax,
            ds.time.values,
            ds.quality_metric.isel(nshear=i).values,
            q_codes=QUALITY_METRIC_CODES,
            maxq=16,
        )
        ax.set_title(f"Shear {i+1} Quality Metrics")

    fig.suptitle(f"Level 4{_subset_suffix(subset)}")
    plt.tight_layout(rect=(0, 0, 1, 0.96))
    return fig, axs

Variables

Standard variable names (variables.py)

Defines standard names of variables used in the code base. Code should deviate from these names only with good reason.

Dictionary `VARIABLES`` is a nested structure of the form: {turban_standard_name: {attribute_name: attribute_value}}.

To make alphabetic sorting easier, this is here coded as the simpler dictionary _vars: