Code Gorgy Time

Top Page

Reply to this message
Author: Frédéric
Date:  
To: Guilde
Subject: Code Gorgy Time


-- 
    Frédéric

# -*- coding: utf-8 -*-

""" IRIG.B122 timecode generator.

An IRIG.B122 timecode is written over a 1KHz sinusoidal carrier.
The timecode consists of 100 bits, each bit is made of 10 carrier periods.

Bits are encoded as followed:

- a marker bit is 8 periods at large amplitude, and 2 periods at reduced (1/3) amplitude;
- a 0 bit is 2 periods at large amplitude, and 8 periods at reduced (1/3) amplitude.
- a 1 bit is 5 periods at large amplitude, and 5 periods at reduced (1/3) amplitude;
"""

import time
import math
import array
import uctypes

import rp2

from irigBTimecode import IrigBTimecode

TREQ_PERMANENT = 0x3f
READ_ADD_TRIG_REG = 15 # see RP2040 datasheet "2.5.2.1 Aliases and Triggers"

CARRIER_FREQ          = 1_000  # IRIG.B carrier clock (1kHz)
NB_SAMPLES_PER_PERIOD = 256    # nb samples in a single period
NB_PERIODS_PER_BIT    = 10     # nb periods in a single IRIG.B122 bit
CYCLIC_RATIO = {               # cyclic ratio per bit type
    'm': 0.8,  # marker bit
    '0': 0.2,  # 0 bit
    '1': 0.5   # 1 bit
}
CARRIER_RATIO = 3  # ratio bewteen large and small carrier amplitudes



class IrigB122:
    """
    """


    def __init__(self, offset, amplitude, ratio=CARRIER_RATIO, nbSamples=NB_SAMPLES_PER_PERIOD):
        """
        """
        carrierLarge = [round(offset + amplitude * math.sin(2 * math.pi * t / nbSamples)) for t in range(nbSamples)]
        carrierSmall = [round(offset + amplitude / CARRIER_RATIO * math.sin(2 * math.pi * t / nbSamples)) for t in range(nbSamples)]


        self._bitMarkerWave = array.array('I')
        for b in int(CYCLIC_RATIO['m'] * NB_PERIODS_PER_BIT) * carrierLarge + int((1-CYCLIC_RATIO['m']) * NB_PERIODS_PER_BIT) * carrierSmall:
            self._bitMarkerWave.append(b)
        print(f"DEBUG::_bitMarkerWave address = {uctypes.addressof(self._bitMarkerWave)}")


        self._bit0Wave = array.array('I')
        for b in int(CYCLIC_RATIO['0'] * NB_PERIODS_PER_BIT) * carrierLarge + int((1-CYCLIC_RATIO['0']) * NB_PERIODS_PER_BIT) * carrierSmall:
            self._bit0Wave.append(b)
        print(f"DEBUG::_bit0Wave address = {uctypes.addressof(self._bit0Wave)}")


        self._bit1Wave = array.array('I')
        for b in int(CYCLIC_RATIO['1'] * NB_PERIODS_PER_BIT) * carrierLarge + int((1-CYCLIC_RATIO['1']) * NB_PERIODS_PER_BIT) * carrierSmall:
            self._bit1Wave.append(b)
        print(f"DEBUG::_bit1Wave address = {uctypes.addressof(self._bit1Wave)}")


        self._dmaFrame = rp2.DMA()
        self._dmaFrame.active(False)
        # self._dmaFrame.irq(self._dmaFrameIsr)  # only 1 hard interupt to avoid conflict
        print(f"DMA Frame channel = {self._dmaFrame.channel}")


        self._dmaWave = rp2.DMA()
        self._dmaWave.active(False)
        # self.dmaWave.irq(self._dmaWaveIsr, hard=True)
        print(f"DMA Wave channel = {self._dmaWave.channel}")


        self.


        self._currentTime = None  # in s, since epoch
        self._frame = None


    # def _dmaFrameIsr(self, dma):
    #     """
    #     """
    #     print("DEBUG::_dmaFrameIsr::dma.read =", dma.read)
    #
    # def _dmaWaveIsr(self, dma):
    #     """
    #     """
    #     print("DEBUG::_dmaWaveIsr::dma.read =", dma.read)


    @property
    def currentTime(self):
        """
        """
        return self._currentTime


    def _dmaWaveConfig(self):
        """ Configure waveform generation DMA.
        """
        raise NotImplementedError("_dmaWaveConfig() must be implemented in real class!")


    def timeToTimecode(self, time_=None):
        """
        """
        return IrigBTimecode(time_)


    def timecodeToFrame(self, timecode):
        """ Convert IRIG.B Timecode to frame (array('I'))
        """
        frame = array.array('I')
        for b in timecode:
            if b in ('R', 'P'):
                frame.append(uctypes.addressof(self._bit0Wave))
            if b in ('I', 'E', '0'):
                frame.append(uctypes.addressof(self._bit0Wave))
            if b == '1':
                frame.append(uctypes.addressof(self._bit1Wave))
        frame.append(0)  # mark end of frame


        return frame


    def send(self, timecode):
        """ Send IRIG.B122 timecode.
        """
        self._frame = self.timecodeToFrame(timecode)
        print(f"DEBUG::frame address = {uctypes.addressof(self._frame)} ({self._frame})")


        self._dmaWaveConfig()


        # Configure Frame DMA
        dmaFrameCtrl = self._dmaFrame.pack_ctrl(enable=True,
                                                high_pri=True,
                                                size=2,
                                                inc_read=True,
                                                inc_write=False,
                                                ring_size=0, ring_sel=False,
                                                chain_to=self._dmaFrame.channel,  # disable chaining
                                                treq_sel=TREQ_PERMANENT,          # unpaced transfert, synchronization is done by PWM wrap
                                                irq_quiet=True,  #False,
                                                bswap=False,
                                                sniff_en=False,
                                                write_err=False,
                                                read_err=False)


        self._dmaFrame.config(read=self._frame,
                              write=self._dmaWave.registers[READ_ADD_TRIG_REG:],  # must use a slice - See https://github.com/micropython/micropython/issues/16083
                              count=1,
                              ctrl=dmaFrameCtrl)
                              trigger=True)  # start DMA


        # self._dmaFrame.active(True)  # start DMA


    def cancel(self):
        """ Cancel generator
        """
        self._dmaWave.active(False)   # cancel DMA
        self._dmaFrame.active(False)  # cancel DMA


    def isBusy(self):
        """ Check if generator is busy
        """
        return self._dmaFrame.read < uctypes.addressof(self._frame) + 4 * len(self._frame)


    def run(self):
        """ Continuously generates IRIG.B122 Timecodes.


        Get current time at startup and send to clock.


        Check if we need to send the timecode over and over, or if the clock can run by
        itself between synchronizations.
        If not, use internal RTC, and send new timecode every second.


        If the RTC drifts, resync every day at midnight from a ntp server.
        """
        self._currentTime = 1729512607  # "2024-10-21 14:10:07" (2024, 10, 21, 14, 10, 7, 0, 295, 0)


# -*- coding: utf-8 -*-

""" IRIG.B122 timecode generator.

Waveforms are generated using a DAC build arround a PIO + R2R network.
For each bit, a DMA feeds the PIO, and sends the 10 periods according to the bit encoding.
To send the complete timecode, a second DMA feeds the first DMA with the address of each bit in the timecode.
"""

import rp2

from irigB122 import CARRIER_FREQ, NB_SAMPLES_PER_PERIOD, NB_PERIODS_PER_BIT, IrigB122

PIO_0_TXF0    = 0x50200010
DREQ_PIO0_TX0 = 0x00


NB_DAC_BITS = 8                                     # nb bits for dac value
SM_NUM      = 0                                     # state machine number
SM_FREQ     = CARRIER_FREQ * NB_SAMPLES_PER_PERIOD  # Hz - TODO: ensure accurate frequency, and frequency without fractional clock division to avoid jitter


OFFSET    = (2 << NB_DAC_BITS) // 2
AMPLITUDE = OFFSET - 1


DAC_LSB_PIN = 0


class IrigB122Dac(IrigB122):
    """ IRIG.B122 timecode generator class
    """
    def __init__(self, dacLsbPin):
        """ Init generator
        """
        super().__init__(OFFSET, AMPLITUDE)


        self._dacLsbPin = dacLsbPin


        self._sm = rp2.StateMachine(SM_NUM, self._pioCode, freq=SM_FREQ, out_base=dacLsbPin)
        self._sm.active(True)


    @staticmethod
    @rp2.asm_pio(out_init=NB_DAC_BITS*(rp2.PIO.OUT_LOW,), out_shiftdir=rp2.PIO.SHIFT_RIGHT, autopull=True, pull_thresh=NB_DAC_BITS)
    def _pioCode():
        """ PIO code


        pins are automatically pulled from the OSR and written to the bus at the SM frequency
        """
        out(pins, 8)  # NB_DAC_BITS


    def _dmaWaveConfig(self):
        dmaWaveCtrl = self._dmaWave.pack_ctrl(enable=True,
                                              high_pri=True,
                                              size=2,
                                              inc_read=True,
                                              inc_write=False,
                                              ring_size=0, ring_sel=False,
                                              chain_to=self._dmaFrame.channel,
                                              treq_sel=DREQ_PIO0_TX0,
                                              irq_quiet=False,
                                              bswap=False,
                                              sniff_en=False,
                                              write_err=False,
                                              read_err=False)


        self._dmaWave.config(write=PIO_0_TXF0,
                            count=NB_PERIODS_PER_BIT*NB_SAMPLES_PER_PERIOD,
                            ctrl=dmaWaveCtrl)



def test():
    """


    # See: https://en.wikipedia.org/wiki/IRIG_timecode#IRIG_timecode
    # R: reference marker (marker bit)
    # I: index marker (0 bit)
    # P: position identifier (marker bit)
    # E: empty bit (0 bit)
    # 0: bit clear (0 bit)
    # 1: bit set (1 bit)
    # Example: 2024-10-21 14:10:07 (Day of year=295)
    timecode = "R1110I000P0000I100EP0010I10EEP1010I1001P01EEEEEEEPEEEEEEEEEPEEEEEEEEEPEEEEEEEEEPEEEEEEEEEPEEEEEEEEEP"
    #            |||| ||| |||| |||  |||| ||   |||| |||| ||
    #               7   0    0   1     4  1      5    9  2
    """
    irigB122Dac = IrigB122Dac(DAC_LSB_PIN)
    try:
        timecode = "R1110I000P0000I100EP0010I10EEP1010I1001P01EEEEEEEPEEEEEEEEEPEEEEEEEEEPEEEEEEEEEPEEEEEEEEEPEEEEEEEEEP"
        irigB122Dac.send(timecode)


        while irigB122Dac.isBusy():
            pass


    finally:
        print("INFO::End")
        irigB122Dac.cancel()



if __name__ == "__main__":
    test()

# -*- coding: utf-8 -*-

""" IRIG.B122 timecode generator.

Waveforms are generated using a PWM (and a low pass filter).
For each bit, a DMA feeds the PWM, and sends the 10 periods according to the bit encoding.
To send the complete timecode, a second DMA feeds the first DMA with the address of each bit in the timecode.
"""

import machine

from irigB122 import CARRIER_FREQ, NB_SAMPLES_PER_PERIOD, NB_PERIODS_PER_BIT, IrigB122

PWM_CH0_CC = 0x4005000c
PWM_CH1_CC = 0x40050020
PWM_CH2_CC = 0x40050034
PWM_CH3_CC = 0x40050048
PWM_CH4_CC = 0x4005005c
PWM_CH5_CC = 0x40050070
PWM_CH6_CC = 0x40050084
PWM_CH7_CC = 0x40050098

PWM_CHn_CC = [
    PWM_CH0_CC, PWM_CH0_CC,  # GPIO00, GPIO01
    PWM_CH1_CC, PWM_CH1_CC,
    PWM_CH2_CC, PWM_CH2_CC,
    PWM_CH3_CC, PWM_CH3_CC,
    PWM_CH4_CC, PWM_CH4_CC,
    PWM_CH5_CC, PWM_CH5_CC,
    PWM_CH6_CC, PWM_CH6_CC,
    PWM_CH7_CC, PWM_CH7_CC,
    PWM_CH0_CC, PWM_CH0_CC,
    PWM_CH1_CC, PWM_CH1_CC,
    PWM_CH2_CC, PWM_CH2_CC,
    PWM_CH3_CC, PWM_CH3_CC,
    PWM_CH4_CC, PWM_CH4_CC,
    PWM_CH5_CC, PWM_CH5_CC,
    PWM_CH6_CC, PWM_CH6_CC   # GPIO28, GPIO29
]


DREQ_PWM_WRAP0 = 0x18
DREQ_PWM_WRAP1 = 0x19
DREQ_PWM_WRAP2 = 0x1a
DREQ_PWM_WRAP3 = 0x1b
DREQ_PWM_WRAP4 = 0x1c
DREQ_PWM_WRAP5 = 0x1d
DREQ_PWM_WRAP6 = 0x1e
DREQ_PWM_WRAP7 = 0x1f

DREQ_PWM_WRAPn = [
    DREQ_PWM_WRAP0, DREQ_PWM_WRAP0,  # GPIO00, GPIO01
    DREQ_PWM_WRAP1, DREQ_PWM_WRAP1,
    DREQ_PWM_WRAP2, DREQ_PWM_WRAP2,
    DREQ_PWM_WRAP3, DREQ_PWM_WRAP3,
    DREQ_PWM_WRAP4, DREQ_PWM_WRAP4,
    DREQ_PWM_WRAP5, DREQ_PWM_WRAP5,
    DREQ_PWM_WRAP6, DREQ_PWM_WRAP6,
    DREQ_PWM_WRAP7, DREQ_PWM_WRAP7,
    DREQ_PWM_WRAP0, DREQ_PWM_WRAP0,
    DREQ_PWM_WRAP1, DREQ_PWM_WRAP1,
    DREQ_PWM_WRAP2, DREQ_PWM_WRAP2,
    DREQ_PWM_WRAP3, DREQ_PWM_WRAP3,
    DREQ_PWM_WRAP4, DREQ_PWM_WRAP4,
    DREQ_PWM_WRAP5, DREQ_PWM_WRAP5,
    DREQ_PWM_WRAP6, DREQ_PWM_WRAP6   # GPIO28, GPIO29
]


BIT_SHIFT = [
    0, 16,  # GPIO00, GPIO01
    0, 16,
    0, 16,
    0, 16,
    0, 16,
    0, 16,
    0, 16,
    0, 16,
    0, 16,
    0, 16,
    0, 16,
    0, 16,
    0, 16,
    0, 16,
    0, 16,
    0, 16,
    0, 16,
    0, 16,
    0, 16,
    0, 16,
    0, 16,
    0, 16,
    0, 16,
    0, 16,
    0, 16,
    0, 16,  # GPIO28, GPIO29
]


F_CPU = 125_000_000 # 125MHz

PWM_FREQ = CARRIER_FREQ * NB_SAMPLES_PER_PERIOD # Hz

OFFSET    = (F_CPU / PWM_FREQ) // 2
AMPLITUDE = OFFSET - 1


PWM_PIN = 0


class IrigB122Pwm(IrigB122):
    """ IRIG.B122 timecode generator class
    """
    def __init__(self, pwmPin):
        """ Init generator
        """
        super().__init__(OFFSET, AMPLITUDE)


        self._pwmPin = pwmPin


        self._pwm = machine.PWM(machine.Pin(pwmPin, machine.Pin.OUT), freq=PWM_FREQ, duty_u16=0)


    def _dmaWaveConfig(self):
        dmaWaveCtrl = self._dmaWave.pack_ctrl(enable=True,
                                              high_pri=True,
                                              size=2,
                                              inc_read=True,
                                              inc_write=False,
                                              ring_size=0, ring_sel=False,
                                              chain_to=self._dmaFrame.channel,
                                              treq_sel=DREQ_PWM_WRAPn[self._pwmPin],
                                              irq_quiet=True,  #False,
                                              bswap=False,
                                              sniff_en=False,
                                              write_err=False,
                                              read_err=False)


        self._dmaWave.config(write=PWM_CHn_CC[self._pwmPin],
                            count=NB_PERIODS_PER_BIT*NB_SAMPLES_PER_PERIOD,
                            ctrl=dmaWaveCtrl)



def test():
    """


    # See: https://en.wikipedia.org/wiki/IRIG_timecode#IRIG_timecode
    # R: reference marker (marker bit)
    # I: index marker (0 bit)
    # P: position identifier (marker bit)
    # E: empty bit (0 bit)
    # 0: bit clear (0 bit)
    # 1: bit set (1 bit)
    # Example: 2024-10-21 14:10:07 (Day of year=295)
    timecode = "R1110I000P0000I100EP0010I10EEP1010I1001P01EEEEEEEPEEEEEEEEEPEEEEEEEEEPEEEEEEEEEPEEEEEEEEEPEEEEEEEEEP"
    #            |||| ||| |||| |||  |||| ||   |||| |||| ||
    #               7   0    0   1     4  1      5    9  2
    """
    irigB122Pwm = IrigB122Pwm(PWM_PIN)
    try:
        timecode = "R1110I000P0000I100EP0010I10EEP1010I1001P01EEEEEEEPEEEEEEEEEPEEEEEEEEEPEEEEEEEEEPEEEEEEEEEPEEEEEEEEEP"
        irigB122Pwm.send(timecode)


        while irigB122Pwm.isBusy():
            pass


    finally:
        print("INFO::End")
        irigB122Pwm.cancel()



if __name__ == "__main__":
    test()

# -*- coding: utf-8 -*-

""" IRIG.B122 timecode generator.
"""
import time

# Time tuple index
YEAR = 0
MONTH = 1
DAY = 2
HOURS = 3
MINUTES = 4
SECONDS = 5
DUMMY = 6
DAY_OF_YEAR = 7
TZ = 8 # only availbale on unix port

DAYS_PER_MONTH = (31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31)


class IrigBTimecode:
    """ IRIG.B122 timecode builder class


    # See: https://en.wikipedia.org/wiki/IRIG_timecode#IRIG_timecode
    # R: reference marker (marker bit)
    # I: index marker (0 bit)
    # P: position identifier (marker bit)
    # E: empty bit (0 bit)
    # 0: bit clear (0 bit)
    # 1: bit set (1 bit)
    # Example: 2024-10-21 14:10:07 (Day of year=295)
    timecode = "R1110I000P0000I100EP0010I10EEP1010I1001P01EEEEEEEPEEEEEEEEEPEEEEEEEEEPEEEEEEEEEPEEEEEEEEEPEEEEEEEEEP"
    #            |||| ||| |||| |||  |||| ||   |||| |||| ||
    #               7   0    0   1     4  1      5    9  2
    """
    def _dec2Bin(self, decimal, digit=4):
        """
        """
        return f"{decimal:0{digit}b}"


    def computeDayOfYear(self, year, month, day):
        """ Compute Day of Year from year, month and day


        Note about leap year (https://en.wikipedia.org/wiki/Leap_year#Gregorian_calendar):
        Every year that is exactly divisible by four is a leap year, except for years that are exactly
        divisible by 100, but these centurial years are leap years if they are exactly divisible by 400.
        For example, the years 1700, 1800, and 1900 are not leap years, but the years 1600 and 2000 are.
        """
        # Current month
        doy = day


        # Full months
        for m in range(month-1):
            doy += DAYS_PER_MONTH[m]


        # Leap year
        if not year % 4 and year % 100 or not year % 400:
            doy += 1


        return doy


    def __call__(self, t=None):
        """ Convert given time to IRIG.B timecode.


        @param t: time as returned by time.localtime/mktime(), or by time.time()
                      If None, use current time
        @type t: tuple or int


        @return
        """
        if t is None:
            t = time.localtime()
        elif isinstance(t, int) or isinstance(t, float):
            t = time.localtime(t)


        seconds = t[SECONDS]
        minutes = t[MINUTES]
        hours = t[HOURS]
        dayOfYear = t[DAY_OF_YEAR]


        timecode = "R"  # start of timecode


        # Set seconds
        for s in reversed(self._dec2Bin(seconds % 10, 4)):
            timecode += s
        timecode += 'I'  # index marker
        for s in reversed(self._dec2Bin(seconds // 10, 3)):
            timecode += s


        timecode += 'P'  # position identifier


        # set minutes
        for s in reversed(self._dec2Bin(minutes % 10, 4)):
            timecode += s
        timecode += 'I'  # index marker
        for s in reversed(self._dec2Bin(minutes // 10, 3)):
            timecode += s
        timecode += 'E'  # empty bit


        timecode += 'P'  # position identifier


        # Set hours
        for s in reversed(self._dec2Bin(hours % 10, 4)):
            timecode += s
        timecode += 'I'  # index marker
        for s in reversed(self._dec2Bin(minutes // 10, 2)):
            timecode += s
        timecode += "EE"  # empty bits


        timecode += 'P'  # position identifier


        # set day of year - TODO: check if needed
        for s in reversed(self._dec2Bin(dayOfYear % 10, 4)):
            timecode += s
        timecode += 'I'  # index marker
        for s in reversed(self._dec2Bin((dayOfYear // 10) % 10, 4)):
            timecode += s
        timecode += 'P'  # position identifier
        for s in reversed(self._dec2Bin((dayOfYear // 100) % 10, 2)):
            timecode += s


        # Complete unused part of timecode
        timecode += "EEEEEEEPEEEEEEEEEPEEEEEEEEEPEEEEEEEEEPEEEEEEEEEPEEEEEEEEEP"


        return timecode



def test():
    """
    """
    TIMECODE = "R1110I000P0000I100EP0010I10EEP1010I1001P01EEEEEEEPEEEEEEEEEPEEEEEEEEEPEEEEEEEEEPEEEEEEEEEPEEEEEEEEEP"


    irigbTimecode = IrigBTimecode()
    # time.strptime("2024-10-21 14:10:07", "%Y-%m-%d %H:%M:%S") unsupported under MicroPython
    print("2024-10-21 14:10:07 -> doy =", irigbTimecode.computeDayOfYear(2024, 10, 21))
    t = (2024, 10, 21, 14, 10, 7, 0, 295)  # 1729512607
    timecode = irigbTimecode(t)


    print(f"original : {TIMECODE}")
    print(f"generated: {timecode}")
    assert TIMECODE == timecode, "Incorrect timecode!"



if __name__ == "__main__":
    test()