--
Frédéric
# -*- coding: utf-8 -*-
""" IRIG.B122 timecode generator.
An IRIG.B122 timecode is written over a 1KHz sinusoidal carrier.
The timecode consists of 100 bits, each bit is made of 10 carrier periods.
Bits are encoded as followed:
- a marker bit is 8 periods at large amplitude, and 2 periods at reduced (1/3) amplitude;
- a 0 bit is 2 periods at large amplitude, and 8 periods at reduced (1/3) amplitude.
- a 1 bit is 5 periods at large amplitude, and 5 periods at reduced (1/3) amplitude;
"""
import time
import math
import array
import uctypes
import rp2
from irigBTimecode import IrigBTimecode
TREQ_PERMANENT = 0x3f
READ_ADD_TRIG_REG = 15 # see RP2040 datasheet "2.5.2.1 Aliases and Triggers"
CARRIER_FREQ = 1_000 # IRIG.B carrier clock (1kHz)
NB_SAMPLES_PER_PERIOD = 256 # nb samples in a single period
NB_PERIODS_PER_BIT = 10 # nb periods in a single IRIG.B122 bit
CYCLIC_RATIO = { # cyclic ratio per bit type
'm': 0.8, # marker bit
'0': 0.2, # 0 bit
'1': 0.5 # 1 bit
}
CARRIER_RATIO = 3 # ratio bewteen large and small carrier amplitudes
class IrigB122:
"""
"""
def __init__(self, offset, amplitude, ratio=CARRIER_RATIO, nbSamples=NB_SAMPLES_PER_PERIOD):
"""
"""
carrierLarge = [round(offset + amplitude * math.sin(2 * math.pi * t / nbSamples)) for t in range(nbSamples)]
carrierSmall = [round(offset + amplitude / CARRIER_RATIO * math.sin(2 * math.pi * t / nbSamples)) for t in range(nbSamples)]
self._bitMarkerWave = array.array('I')
for b in int(CYCLIC_RATIO['m'] * NB_PERIODS_PER_BIT) * carrierLarge + int((1-CYCLIC_RATIO['m']) * NB_PERIODS_PER_BIT) * carrierSmall:
self._bitMarkerWave.append(b)
print(f"DEBUG::_bitMarkerWave address = {uctypes.addressof(self._bitMarkerWave)}")
self._bit0Wave = array.array('I')
for b in int(CYCLIC_RATIO['0'] * NB_PERIODS_PER_BIT) * carrierLarge + int((1-CYCLIC_RATIO['0']) * NB_PERIODS_PER_BIT) * carrierSmall:
self._bit0Wave.append(b)
print(f"DEBUG::_bit0Wave address = {uctypes.addressof(self._bit0Wave)}")
self._bit1Wave = array.array('I')
for b in int(CYCLIC_RATIO['1'] * NB_PERIODS_PER_BIT) * carrierLarge + int((1-CYCLIC_RATIO['1']) * NB_PERIODS_PER_BIT) * carrierSmall:
self._bit1Wave.append(b)
print(f"DEBUG::_bit1Wave address = {uctypes.addressof(self._bit1Wave)}")
self._dmaFrame = rp2.DMA()
self._dmaFrame.active(False)
# self._dmaFrame.irq(self._dmaFrameIsr) # only 1 hard interupt to avoid conflict
print(f"DMA Frame channel = {self._dmaFrame.channel}")
self._dmaWave = rp2.DMA()
self._dmaWave.active(False)
# self.dmaWave.irq(self._dmaWaveIsr, hard=True)
print(f"DMA Wave channel = {self._dmaWave.channel}")
self.
self._currentTime = None # in s, since epoch
self._frame = None
# def _dmaFrameIsr(self, dma):
# """
# """
# print("DEBUG::_dmaFrameIsr::dma.read =", dma.read)
#
# def _dmaWaveIsr(self, dma):
# """
# """
# print("DEBUG::_dmaWaveIsr::dma.read =", dma.read)
@property
def currentTime(self):
"""
"""
return self._currentTime
def _dmaWaveConfig(self):
""" Configure waveform generation DMA.
"""
raise NotImplementedError("_dmaWaveConfig() must be implemented in real class!")
def timeToTimecode(self, time_=None):
"""
"""
return IrigBTimecode(time_)
def timecodeToFrame(self, timecode):
""" Convert IRIG.B Timecode to frame (array('I'))
"""
frame = array.array('I')
for b in timecode:
if b in ('R', 'P'):
frame.append(uctypes.addressof(self._bit0Wave))
if b in ('I', 'E', '0'):
frame.append(uctypes.addressof(self._bit0Wave))
if b == '1':
frame.append(uctypes.addressof(self._bit1Wave))
frame.append(0) # mark end of frame
return frame
def send(self, timecode):
""" Send IRIG.B122 timecode.
"""
self._frame = self.timecodeToFrame(timecode)
print(f"DEBUG::frame address = {uctypes.addressof(self._frame)} ({self._frame})")
self._dmaWaveConfig()
# Configure Frame DMA
dmaFrameCtrl = self._dmaFrame.pack_ctrl(enable=True,
high_pri=True,
size=2,
inc_read=True,
inc_write=False,
ring_size=0, ring_sel=False,
chain_to=self._dmaFrame.channel, # disable chaining
treq_sel=TREQ_PERMANENT, # unpaced transfert, synchronization is done by PWM wrap
irq_quiet=True, #False,
bswap=False,
sniff_en=False,
write_err=False,
read_err=False)
self._dmaFrame.config(read=self._frame,
write=self._dmaWave.registers[READ_ADD_TRIG_REG:], # must use a slice - See https://github.com/micropython/micropython/issues/16083
count=1,
ctrl=dmaFrameCtrl)
trigger=True) # start DMA
# self._dmaFrame.active(True) # start DMA
def cancel(self):
""" Cancel generator
"""
self._dmaWave.active(False) # cancel DMA
self._dmaFrame.active(False) # cancel DMA
def isBusy(self):
""" Check if generator is busy
"""
return self._dmaFrame.read < uctypes.addressof(self._frame) + 4 * len(self._frame)
def run(self):
""" Continuously generates IRIG.B122 Timecodes.
Get current time at startup and send to clock.
Check if we need to send the timecode over and over, or if the clock can run by
itself between synchronizations.
If not, use internal RTC, and send new timecode every second.
If the RTC drifts, resync every day at midnight from a ntp server.
"""
self._currentTime = 1729512607 # "2024-10-21 14:10:07" (2024, 10, 21, 14, 10, 7, 0, 295, 0)
# -*- coding: utf-8 -*-
""" IRIG.B122 timecode generator.
Waveforms are generated using a DAC build arround a PIO + R2R network.
For each bit, a DMA feeds the PIO, and sends the 10 periods according to the bit encoding.
To send the complete timecode, a second DMA feeds the first DMA with the address of each bit in the timecode.
"""
import rp2
from irigB122 import CARRIER_FREQ, NB_SAMPLES_PER_PERIOD, NB_PERIODS_PER_BIT, IrigB122
PIO_0_TXF0 = 0x50200010
DREQ_PIO0_TX0 = 0x00
NB_DAC_BITS = 8 # nb bits for dac value
SM_NUM = 0 # state machine number
SM_FREQ = CARRIER_FREQ * NB_SAMPLES_PER_PERIOD # Hz - TODO: ensure accurate frequency, and frequency without fractional clock division to avoid jitter
OFFSET = (2 << NB_DAC_BITS) // 2
AMPLITUDE = OFFSET - 1
DAC_LSB_PIN = 0
class IrigB122Dac(IrigB122):
""" IRIG.B122 timecode generator class
"""
def __init__(self, dacLsbPin):
""" Init generator
"""
super().__init__(OFFSET, AMPLITUDE)
self._dacLsbPin = dacLsbPin
self._sm = rp2.StateMachine(SM_NUM, self._pioCode, freq=SM_FREQ, out_base=dacLsbPin)
self._sm.active(True)
@staticmethod
@rp2.asm_pio(out_init=NB_DAC_BITS*(rp2.PIO.OUT_LOW,), out_shiftdir=rp2.PIO.SHIFT_RIGHT, autopull=True, pull_thresh=NB_DAC_BITS)
def _pioCode():
""" PIO code
pins are automatically pulled from the OSR and written to the bus at the SM frequency
"""
out(pins, 8) # NB_DAC_BITS
def _dmaWaveConfig(self):
dmaWaveCtrl = self._dmaWave.pack_ctrl(enable=True,
high_pri=True,
size=2,
inc_read=True,
inc_write=False,
ring_size=0, ring_sel=False,
chain_to=self._dmaFrame.channel,
treq_sel=DREQ_PIO0_TX0,
irq_quiet=False,
bswap=False,
sniff_en=False,
write_err=False,
read_err=False)
self._dmaWave.config(write=PIO_0_TXF0,
count=NB_PERIODS_PER_BIT*NB_SAMPLES_PER_PERIOD,
ctrl=dmaWaveCtrl)
def test():
"""
# See: https://en.wikipedia.org/wiki/IRIG_timecode#IRIG_timecode
# R: reference marker (marker bit)
# I: index marker (0 bit)
# P: position identifier (marker bit)
# E: empty bit (0 bit)
# 0: bit clear (0 bit)
# 1: bit set (1 bit)
# Example: 2024-10-21 14:10:07 (Day of year=295)
timecode = "R1110I000P0000I100EP0010I10EEP1010I1001P01EEEEEEEPEEEEEEEEEPEEEEEEEEEPEEEEEEEEEPEEEEEEEEEPEEEEEEEEEP"
# |||| ||| |||| ||| |||| || |||| |||| ||
# 7 0 0 1 4 1 5 9 2
"""
irigB122Dac = IrigB122Dac(DAC_LSB_PIN)
try:
timecode = "R1110I000P0000I100EP0010I10EEP1010I1001P01EEEEEEEPEEEEEEEEEPEEEEEEEEEPEEEEEEEEEPEEEEEEEEEPEEEEEEEEEP"
irigB122Dac.send(timecode)
while irigB122Dac.isBusy():
pass
finally:
print("INFO::End")
irigB122Dac.cancel()
if __name__ == "__main__":
test()
# -*- coding: utf-8 -*-
""" IRIG.B122 timecode generator.
Waveforms are generated using a PWM (and a low pass filter).
For each bit, a DMA feeds the PWM, and sends the 10 periods according to the bit encoding.
To send the complete timecode, a second DMA feeds the first DMA with the address of each bit in the timecode.
"""
import machine
from irigB122 import CARRIER_FREQ, NB_SAMPLES_PER_PERIOD, NB_PERIODS_PER_BIT, IrigB122
PWM_CH0_CC = 0x4005000c
PWM_CH1_CC = 0x40050020
PWM_CH2_CC = 0x40050034
PWM_CH3_CC = 0x40050048
PWM_CH4_CC = 0x4005005c
PWM_CH5_CC = 0x40050070
PWM_CH6_CC = 0x40050084
PWM_CH7_CC = 0x40050098
PWM_CHn_CC = [
PWM_CH0_CC, PWM_CH0_CC, # GPIO00, GPIO01
PWM_CH1_CC, PWM_CH1_CC,
PWM_CH2_CC, PWM_CH2_CC,
PWM_CH3_CC, PWM_CH3_CC,
PWM_CH4_CC, PWM_CH4_CC,
PWM_CH5_CC, PWM_CH5_CC,
PWM_CH6_CC, PWM_CH6_CC,
PWM_CH7_CC, PWM_CH7_CC,
PWM_CH0_CC, PWM_CH0_CC,
PWM_CH1_CC, PWM_CH1_CC,
PWM_CH2_CC, PWM_CH2_CC,
PWM_CH3_CC, PWM_CH3_CC,
PWM_CH4_CC, PWM_CH4_CC,
PWM_CH5_CC, PWM_CH5_CC,
PWM_CH6_CC, PWM_CH6_CC # GPIO28, GPIO29
]
DREQ_PWM_WRAP0 = 0x18
DREQ_PWM_WRAP1 = 0x19
DREQ_PWM_WRAP2 = 0x1a
DREQ_PWM_WRAP3 = 0x1b
DREQ_PWM_WRAP4 = 0x1c
DREQ_PWM_WRAP5 = 0x1d
DREQ_PWM_WRAP6 = 0x1e
DREQ_PWM_WRAP7 = 0x1f
DREQ_PWM_WRAPn = [
DREQ_PWM_WRAP0, DREQ_PWM_WRAP0, # GPIO00, GPIO01
DREQ_PWM_WRAP1, DREQ_PWM_WRAP1,
DREQ_PWM_WRAP2, DREQ_PWM_WRAP2,
DREQ_PWM_WRAP3, DREQ_PWM_WRAP3,
DREQ_PWM_WRAP4, DREQ_PWM_WRAP4,
DREQ_PWM_WRAP5, DREQ_PWM_WRAP5,
DREQ_PWM_WRAP6, DREQ_PWM_WRAP6,
DREQ_PWM_WRAP7, DREQ_PWM_WRAP7,
DREQ_PWM_WRAP0, DREQ_PWM_WRAP0,
DREQ_PWM_WRAP1, DREQ_PWM_WRAP1,
DREQ_PWM_WRAP2, DREQ_PWM_WRAP2,
DREQ_PWM_WRAP3, DREQ_PWM_WRAP3,
DREQ_PWM_WRAP4, DREQ_PWM_WRAP4,
DREQ_PWM_WRAP5, DREQ_PWM_WRAP5,
DREQ_PWM_WRAP6, DREQ_PWM_WRAP6 # GPIO28, GPIO29
]
BIT_SHIFT = [
0, 16, # GPIO00, GPIO01
0, 16,
0, 16,
0, 16,
0, 16,
0, 16,
0, 16,
0, 16,
0, 16,
0, 16,
0, 16,
0, 16,
0, 16,
0, 16,
0, 16,
0, 16,
0, 16,
0, 16,
0, 16,
0, 16,
0, 16,
0, 16,
0, 16,
0, 16,
0, 16,
0, 16, # GPIO28, GPIO29
]
F_CPU = 125_000_000 # 125MHz
PWM_FREQ = CARRIER_FREQ * NB_SAMPLES_PER_PERIOD # Hz
OFFSET = (F_CPU / PWM_FREQ) // 2
AMPLITUDE = OFFSET - 1
PWM_PIN = 0
class IrigB122Pwm(IrigB122):
""" IRIG.B122 timecode generator class
"""
def __init__(self, pwmPin):
""" Init generator
"""
super().__init__(OFFSET, AMPLITUDE)
self._pwmPin = pwmPin
self._pwm = machine.PWM(machine.Pin(pwmPin, machine.Pin.OUT), freq=PWM_FREQ, duty_u16=0)
def _dmaWaveConfig(self):
dmaWaveCtrl = self._dmaWave.pack_ctrl(enable=True,
high_pri=True,
size=2,
inc_read=True,
inc_write=False,
ring_size=0, ring_sel=False,
chain_to=self._dmaFrame.channel,
treq_sel=DREQ_PWM_WRAPn[self._pwmPin],
irq_quiet=True, #False,
bswap=False,
sniff_en=False,
write_err=False,
read_err=False)
self._dmaWave.config(write=PWM_CHn_CC[self._pwmPin],
count=NB_PERIODS_PER_BIT*NB_SAMPLES_PER_PERIOD,
ctrl=dmaWaveCtrl)
def test():
"""
# See: https://en.wikipedia.org/wiki/IRIG_timecode#IRIG_timecode
# R: reference marker (marker bit)
# I: index marker (0 bit)
# P: position identifier (marker bit)
# E: empty bit (0 bit)
# 0: bit clear (0 bit)
# 1: bit set (1 bit)
# Example: 2024-10-21 14:10:07 (Day of year=295)
timecode = "R1110I000P0000I100EP0010I10EEP1010I1001P01EEEEEEEPEEEEEEEEEPEEEEEEEEEPEEEEEEEEEPEEEEEEEEEPEEEEEEEEEP"
# |||| ||| |||| ||| |||| || |||| |||| ||
# 7 0 0 1 4 1 5 9 2
"""
irigB122Pwm = IrigB122Pwm(PWM_PIN)
try:
timecode = "R1110I000P0000I100EP0010I10EEP1010I1001P01EEEEEEEPEEEEEEEEEPEEEEEEEEEPEEEEEEEEEPEEEEEEEEEPEEEEEEEEEP"
irigB122Pwm.send(timecode)
while irigB122Pwm.isBusy():
pass
finally:
print("INFO::End")
irigB122Pwm.cancel()
if __name__ == "__main__":
test()
# -*- coding: utf-8 -*-
""" IRIG.B122 timecode generator.
"""
import time
# Time tuple index
YEAR = 0
MONTH = 1
DAY = 2
HOURS = 3
MINUTES = 4
SECONDS = 5
DUMMY = 6
DAY_OF_YEAR = 7
TZ = 8 # only availbale on unix port
DAYS_PER_MONTH = (31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31)
class IrigBTimecode:
""" IRIG.B122 timecode builder class
# See: https://en.wikipedia.org/wiki/IRIG_timecode#IRIG_timecode
# R: reference marker (marker bit)
# I: index marker (0 bit)
# P: position identifier (marker bit)
# E: empty bit (0 bit)
# 0: bit clear (0 bit)
# 1: bit set (1 bit)
# Example: 2024-10-21 14:10:07 (Day of year=295)
timecode = "R1110I000P0000I100EP0010I10EEP1010I1001P01EEEEEEEPEEEEEEEEEPEEEEEEEEEPEEEEEEEEEPEEEEEEEEEPEEEEEEEEEP"
# |||| ||| |||| ||| |||| || |||| |||| ||
# 7 0 0 1 4 1 5 9 2
"""
def _dec2Bin(self, decimal, digit=4):
"""
"""
return f"{decimal:0{digit}b}"
def computeDayOfYear(self, year, month, day):
""" Compute Day of Year from year, month and day
Note about leap year (https://en.wikipedia.org/wiki/Leap_year#Gregorian_calendar):
Every year that is exactly divisible by four is a leap year, except for years that are exactly
divisible by 100, but these centurial years are leap years if they are exactly divisible by 400.
For example, the years 1700, 1800, and 1900 are not leap years, but the years 1600 and 2000 are.
"""
# Current month
doy = day
# Full months
for m in range(month-1):
doy += DAYS_PER_MONTH[m]
# Leap year
if not year % 4 and year % 100 or not year % 400:
doy += 1
return doy
def __call__(self, t=None):
""" Convert given time to IRIG.B timecode.
@param t: time as returned by time.localtime/mktime(), or by time.time()
If None, use current time
@type t: tuple or int
@return
"""
if t is None:
t = time.localtime()
elif isinstance(t, int) or isinstance(t, float):
t = time.localtime(t)
seconds = t[SECONDS]
minutes = t[MINUTES]
hours = t[HOURS]
dayOfYear = t[DAY_OF_YEAR]
timecode = "R" # start of timecode
# Set seconds
for s in reversed(self._dec2Bin(seconds % 10, 4)):
timecode += s
timecode += 'I' # index marker
for s in reversed(self._dec2Bin(seconds // 10, 3)):
timecode += s
timecode += 'P' # position identifier
# set minutes
for s in reversed(self._dec2Bin(minutes % 10, 4)):
timecode += s
timecode += 'I' # index marker
for s in reversed(self._dec2Bin(minutes // 10, 3)):
timecode += s
timecode += 'E' # empty bit
timecode += 'P' # position identifier
# Set hours
for s in reversed(self._dec2Bin(hours % 10, 4)):
timecode += s
timecode += 'I' # index marker
for s in reversed(self._dec2Bin(minutes // 10, 2)):
timecode += s
timecode += "EE" # empty bits
timecode += 'P' # position identifier
# set day of year - TODO: check if needed
for s in reversed(self._dec2Bin(dayOfYear % 10, 4)):
timecode += s
timecode += 'I' # index marker
for s in reversed(self._dec2Bin((dayOfYear // 10) % 10, 4)):
timecode += s
timecode += 'P' # position identifier
for s in reversed(self._dec2Bin((dayOfYear // 100) % 10, 2)):
timecode += s
# Complete unused part of timecode
timecode += "EEEEEEEPEEEEEEEEEPEEEEEEEEEPEEEEEEEEEPEEEEEEEEEPEEEEEEEEEP"
return timecode
def test():
"""
"""
TIMECODE = "R1110I000P0000I100EP0010I10EEP1010I1001P01EEEEEEEPEEEEEEEEEPEEEEEEEEEPEEEEEEEEEPEEEEEEEEEPEEEEEEEEEP"
irigbTimecode = IrigBTimecode()
# time.strptime("2024-10-21 14:10:07", "%Y-%m-%d %H:%M:%S") unsupported under MicroPython
print("2024-10-21 14:10:07 -> doy =", irigbTimecode.computeDayOfYear(2024, 10, 21))
t = (2024, 10, 21, 14, 10, 7, 0, 295) # 1729512607
timecode = irigbTimecode(t)
print(f"original : {TIMECODE}")
print(f"generated: {timecode}")
assert TIMECODE == timecode, "Incorrect timecode!"
if __name__ == "__main__":
test()