import array from ctypes import * # Find out if we need to endian swap the buffer t = array.array('H', '\x00\x01') if t[0] == 1: is_little_endian = False else: is_little_endian = True del t flaclib = CDLL('libFLAC.so') # Defines FLAC__MAX_CHANNELS = 8 FLAC__MAX_LPC_ORDER = 32 FLAC__MAX_FIXED_ORDER = 4 # Data FLAC__StreamDecoderInitStatusString = (c_char_p * 6).in_dll(flaclib, 'FLAC__StreamDecoderInitStatusString') FLAC__StreamDecoderErrorStatusString = (c_char_p * 4).in_dll(flaclib, 'FLAC__StreamDecoderErrorStatusString') # Enums FLAC__STREAM_DECODER_INIT_STATUS_OK = 0 FLAC__FRAME_NUMBER_TYPE_FRAME_NUMBER = 0 FLAC__FRAME_NUMBER_TYPE_SAMPLE_NUMBER = 1 FLAC__SUBFRAME_TYPE_CONSTANT = 0 FLAC__SUBFRAME_TYPE_VERBATIM = 1 FLAC__SUBFRAME_TYPE_FIXED = 2 FLAC__SUBFRAME_TYPE_LPC = 3 class Structure(Structure): def __repr__(self): cls = self.__class__ return '<%s.%s: %s>' % (cls.__module__, cls.__name__, ', '.join([ '%s: %s' % (x, `getattr(self, x)`) for x, t in self._fields_ ])) class FLAC__StreamMetadata_StreamInfo(Structure): _fields_ = [ ('min_blocksize', c_uint), ('max_blocksize', c_uint), ('min_framesize', c_uint), ('max_framesize', c_uint), ('sample_rate', c_uint), ('channels', c_uint), ('bits_per_sample', c_uint), ('total_samples', c_uint64), ('md5sum', c_byte * 16), ] class FLAC__StreamMetadataData(Union): _fields_ = [ ('stream_info', FLAC__StreamMetadata_StreamInfo), ] class FLAC__StreamMetadata(Structure): _fields_ = [ ('type', c_int), ('is_last', c_int), ('length', c_uint), ('data', FLAC__StreamMetadataData), ] class FLAC__EntropyCodingMethod_PartitionedRiceContents(Structure): pass class FLAC__EntropyCodingMethod_PartitionedRice(Structure): _fields_ = [ ('order', c_uint), ('contents', POINTER(FLAC__EntropyCodingMethod_PartitionedRiceContents)), ] class FLAC__EntropyCodingMethod(Structure): _fields_ = [ ('type', c_int), ('partitioned_rice', FLAC__EntropyCodingMethod_PartitionedRice), ] class FLAC__Subframe_Constant(Structure): _fields_ = [ ('value', c_int32), ] class FLAC__Subframe_Verbatim(Structure): _fields_ = [ ('data', POINTER(c_int32)), ] class FLAC__Subframe_Fixed(Structure): _fields_ = [ ('entropy_coding_method', FLAC__EntropyCodingMethod), ('order', c_uint), ('warmup', c_int32 * FLAC__MAX_FIXED_ORDER), ('residual', POINTER(c_int32)), ] class FLAC__Subframe_LPC(Structure): _fields_ = [ ('entropy_coding_method', FLAC__EntropyCodingMethod), ('order', c_uint), ('qlp_coeff_precision', c_uint), ('quantization_level', c_int), ('qlp_coeff', c_int32 * FLAC__MAX_LPC_ORDER), ('warmup', c_int32 * FLAC__MAX_LPC_ORDER), ('residual', POINTER(c_int32)), ] class FLAC__SubframeUnion(Union): _fields_ = [ ('constant', FLAC__Subframe_Constant), ('fixed', FLAC__Subframe_Fixed), ('lpc', FLAC__Subframe_LPC), ('verbatim', FLAC__Subframe_Verbatim), ] class FLAC__Subframe(Structure): _fields_ = [ ('type', c_int), ('data', FLAC__SubframeUnion), ('wasted_bits', c_uint), ] class number_union(Union): _fields_ = [ ('frame_number', c_uint32), ('sample_number', c_uint64), ] class FLAC__FrameHeader(Structure): _fields_ = [ ('blocksize', c_uint), ('sample_rate', c_uint), ('channels', c_uint), ('channel_assignment', c_int), ('bits_per_sample', c_uint), ('number_type', c_int), ('number', number_union), ('crc', c_uint8), ] class FLAC__FrameFooter(Structure): _fields_ = [ ('crc', c_uint16), ] class FLAC__Frame(Structure): _fields_ = [ ('header', FLAC__FrameHeader), ('subframes', FLAC__Subframe * FLAC__MAX_CHANNELS), ('footer', FLAC__FrameFooter), ] class FLAC__StreamDecoder(Structure): pass # Types FLAC__StreamMetadata_p = POINTER(FLAC__StreamMetadata) FLAC__Frame_p = POINTER(FLAC__Frame) FLAC__StreamDecoder_p = POINTER(FLAC__StreamDecoder) # Function typedefs FLAC__StreamDecoderReadCallback = CFUNCTYPE(c_int, FLAC__StreamDecoder_p, POINTER(c_byte), POINTER(c_size_t), c_void_p) FLAC__StreamDecoderSeekCallback = CFUNCTYPE(c_int, FLAC__StreamDecoder_p, c_uint64, c_void_p) FLAC__StreamDecoderTellCallback = CFUNCTYPE(c_int, FLAC__StreamDecoder_p, POINTER(c_uint64), c_void_p) FLAC__StreamDecoderLengthCallback = CFUNCTYPE(c_int, FLAC__StreamDecoder_p, POINTER(c_uint64), c_void_p) FLAC__StreamDecoderEofCallback = CFUNCTYPE(c_int, FLAC__StreamDecoder_p, c_void_p) FLAC__StreamDecoderWriteCallback = CFUNCTYPE(c_int, FLAC__StreamDecoder_p, FLAC__Frame_p, POINTER(POINTER(c_int32)), c_void_p) FLAC__StreamDecoderMetadataCallback = CFUNCTYPE(None, FLAC__StreamDecoder_p, FLAC__StreamMetadata_p, c_void_p) FLAC__StreamDecoderErrorCallback = CFUNCTYPE(None, FLAC__StreamDecoder_p, c_int, c_void_p) funs = { 'FLAC__stream_decoder_new': (FLAC__StreamDecoder_p, []), 'FLAC__stream_decoder_delete': (None, [FLAC__StreamDecoder_p, ]), 'FLAC__stream_decoder_set_md5_checking': (c_int, [ FLAC__StreamDecoder_p, c_int, ]), 'FLAC__stream_decoder_set_metadata_respond_all': (c_int, [ FLAC__StreamDecoder_p, ]), 'FLAC__stream_decoder_get_total_samples': (c_uint64, [ FLAC__StreamDecoder_p, ]), 'FLAC__stream_decoder_get_channels': (c_uint, [ FLAC__StreamDecoder_p, ]), 'FLAC__stream_decoder_get_channel_assignment': (c_int, [ FLAC__StreamDecoder_p, ]), 'FLAC__stream_decoder_get_bits_per_sample': (c_uint, [ FLAC__StreamDecoder_p, ]), 'FLAC__stream_decoder_get_sample_rate': (c_uint, [ FLAC__StreamDecoder_p, ]), 'FLAC__stream_decoder_get_blocksize': (c_uint, [ FLAC__StreamDecoder_p, ]), 'FLAC__stream_decoder_init_stream': (c_int, [ FLAC__StreamDecoder_p, FLAC__StreamDecoderReadCallback, FLAC__StreamDecoderSeekCallback, FLAC__StreamDecoderTellCallback, FLAC__StreamDecoderLengthCallback, FLAC__StreamDecoderEofCallback, FLAC__StreamDecoderWriteCallback, FLAC__StreamDecoderMetadataCallback, FLAC__StreamDecoderErrorCallback, ]), 'FLAC__stream_decoder_init_file': (c_int, [ FLAC__StreamDecoder_p, FLAC__StreamDecoderWriteCallback, FLAC__StreamDecoderMetadataCallback, FLAC__StreamDecoderErrorCallback, ]), 'FLAC__stream_decoder_finish': (c_int, [ FLAC__StreamDecoder_p, ]), 'FLAC__stream_decoder_flush': (c_int, [ FLAC__StreamDecoder_p, ]), 'FLAC__stream_decoder_reset': (c_int, [ FLAC__StreamDecoder_p, ]), 'FLAC__stream_decoder_process_single': (c_int, [ FLAC__StreamDecoder_p, ]), 'FLAC__stream_decoder_process_until_end_of_metadata': (c_int, [ FLAC__StreamDecoder_p, ]), 'FLAC__stream_decoder_process_until_end_of_stream': (c_int, [ FLAC__StreamDecoder_p, ]), 'FLAC__stream_decoder_seek_absolute': (c_int, [ FLAC__StreamDecoder_p, c_uint64, ]), } for i in funs: f = getattr(flaclib, i) f.restype, f.argtypes = funs[i] def clientdatawrapper(fun): def newfun(*args): #print 'nf:', `args` return fun(*args[1:-1]) return newfun class FLACDec(object): channels = property(lambda x: x._channels) samplerate = property(lambda x: x._samplerate) bitspersample = property(lambda x: x._bitspersample) totalsamples = property(lambda x: x._totalsamples) bytespersample = property(lambda x: x._bytespersample) def __len__(self): return self._total_samps def __init__(self, file): self.flacdec = None self._lasterror = None self.flacdec = flaclib.FLAC__stream_decoder_new() if self.flacdec == 0: raise RuntimeError('allocating decoded') # We need to keep references to the callback functions # around so they won't be garbage collected. self.write_wrap = FLAC__StreamDecoderWriteCallback( clientdatawrapper(self.cb_write)) self.metadata_wrap = FLAC__StreamDecoderMetadataCallback( clientdatawrapper(self.cb_metadata)) self.error_wrap = FLAC__StreamDecoderErrorCallback( clientdatawrapper(self.cb_error)) flaclib.FLAC__stream_decoder_set_md5_checking(self.flacdec, True) status = flaclib.FLAC__stream_decoder_init_file(self.flacdec, file, self.write_wrap, self.metadata_wrap, self.error_wrap, None) if status != FLAC__STREAM_DECODER_INIT_STATUS_OK: raise ValueError( FLAC__StreamDecoderInitStatusString[status]) print 'init' flaclib.FLAC__stream_decoder_process_until_end_of_metadata(self.flacdec) print 'end of meta' if self._lasterror is not None: raise ValueError( FLAC__StreamDecoderErrorStatusString[ self._lasterror]) self.curcnt = 0 self.cursamp = 0 def close(self): if self.flacdec is None: return print 'delete called' if not flaclib.FLAC__stream_decoder_finish(self.flacdec): md5invalid = True else: md5invalid = False flaclib.FLAC__stream_decoder_delete(self.flacdec) self.flacdec = None self.write_wrap = None self.metadata_wrap = None self.error_wrap = None if md5invalid: pass #raise ValueError('invalid md5') def cb_write(self, frame_p, buffer_pp): frame = frame_p[0] #print 'write:', `frame` #for i in xrange(frame.header.channels): # print '%d:' % i, `frame.subframes[i]` nchan = frame.header.channels #print 'sample number:', frame.header.number.sample_number if frame.header.bits_per_sample == 16: self.cursamp = frame.header.number.sample_number self.curcnt = frame.header.blocksize self.curdata = array.array('h', [ buffer_pp[x % nchan][x / nchan] for x in xrange(frame.header.blocksize * nchan)]) if is_little_endian: self.curdata.byteswap() else: print 'ERROR!' return 0 def cb_metadata(self, metadata_p): md = metadata_p[0] print 'metadata:', `md` if md.type == 0: si = md.data.stream_info self._channels = si.channels self._samplerate = si.sample_rate self._bitspersample = si.bits_per_sample self._totalsamples = si.total_samples self._bytespersample = si.channels * si.bits_per_sample / 8 print `si` def cb_error(self, errstatus): self._lasterror = errstatus def goto(self, pos): if self.flacdec is None: raise ValueError('closed') pos = min(self.totalsamples, pos) if flaclib.FLAC__stream_decoder_seek_absolute(self.flacdec, pos): return # the slow way if self.cursamp > pos: if not flaclib.FLAC__stream_decoder_reset(self.flacdec): raise RuntimeError('unable to seek to beginin') flaclib.FLAC__stream_decoder_process_until_end_of_metadata(self.flacdec) flaclib.FLAC__stream_decoder_process_single(self.flacdec) read = pos - self.cursamp while read: tread = min(read, 512*1024) self.read(tread) read -= tread def read(self, nsamp): if self.flacdec is None: raise ValueError('closed') r = [] nsamp = min(nsamp, self.totalsamples - self.cursamp) while nsamp: cnt = min(nsamp, self.curcnt) sampcnt = cnt * self.channels if cnt == 0 and self.curcnt == 0: flaclib.FLAC__stream_decoder_process_single(self.flacdec) continue r.append(self.curdata[:sampcnt].tostring()) self.cursamp += cnt self.curcnt -= cnt self.curdata = self.curdata[sampcnt:] nsamp -= cnt return ''.join(r) if __name__ == '__main__': import sys d = FLACDec(sys.argv[1]) print 'total samples:', d.totalsamples print 'channels:', d.channels print 'rate:', d.samplerate print 'bps:', d.bitspersample print `d.read(10)` print 'going' d.goto(d.totalsamples - 128) print 'here' print `d.read(10)`