import numpy as np from gnuradio import gr import pmt class ls_channel_estimation(gr.basic_block): """ docstring for block multiply_py_ff """ def __init__(self,sync_symbol1,sync_symbol2,n_data_symbols,max_carr_offset,force_one_sync_symbol,cp_len): gr.basic_block.__init__(self, name="LS Channel Estimation block", in_sig=[np.dtype((np.complex64,len(sync_symbol1)))], out_sig=[np.dtype((np.complex64,len(sync_symbol1)))]) self.fft_len=len(sync_symbol1) self.n_data_syms=n_data_symbols self.n_sync_syms=1 self.sync_sym1=sync_symbol1[:] self.sync_sym2=sync_symbol2[:] self.ref_sym = self.sync_sym2[:] if (len(self.sync_sym2)>0 and not force_one_sync_symbol) else self.sync_sym1[:] self.corr_v=sync_symbol2[:] self.known_symbol_diffs=None self.new_symbol_diffs=None self.first_active_carrier=0 self.last_active_carrier=len(sync_symbol2)-1 self.interpolate=False self.set_tag_propagation_policy(gr.TPP_DONT) self.set_output_multiple(self.n_data_syms) self.set_relative_rate((1.0*self.n_data_syms) / (self.n_data_syms+self.n_sync_syms)) self.cp_len=cp_len self.force_one_sync_symbol=force_one_sync_symbol self.max_carr_offset=max_carr_offset # set index of first and last active carrier for i in range(self.fft_len): if(self.ref_sym[i]!=0): self.first_active_carrier=i break for i in range(self.fft_len-1,-1,-1): if(self.ref_sym[i]!=0): self.last_active_carrier=i break # sanity checks if(len(self.sync_sym2)>0): if(len(self.sync_sym1)!=len(self.sync_sym2)): print "syn symbols must have equal lengths. " exit(0) if(not self.force_one_sync_symbol): self.n_sync_syms=2 else: if(self.sync_sym1[self.first_active_carrier+1]==0): self.last_active_carrier+=1 self.interpolate=True self.max_neg_carr_offset=-1*self.first_active_carrier self.max_pos_carr_offset=self.fft_len-self.last_active_carrier-1 if(self.max_carr_offset!=-1): self.max_neg_carr_offset=max(-1*self.max_carr_offset,self.max_neg_carr_offset) self.max_pos_carr_offset=max(self.max_carr_offset,self.max_pos_carr_offset) if(self.max_neg_carr_offset%2): self.max_neg_carr_offset+=1 if(self.max_pos_carr_offset%2): self.max_pos_carr_offset-=1 if(self.n_sync_syms==2): for i in range(self.fft_len): if(self.sync_sym1[i]==0): self.corr_v[i]=0+0j else: self.corr_v[i]/=self.sync_sym1[i] else: self.corr_v=None self.known_symbol_diffs=self.fft_len*[0+0j] self.new_symbol_diffs=self.fft_len*[0+0j] for i in range(self.first_active_carrier,self.last_active_carrier-2,2): self.known_symbol_diffs[i]=abs(self.sync_sym1[i]-self.sync_sym1[i+2]) def get_carr_offset(self,d_sync_sym1,d_sync_sym2): carr_offset=0 if(self.corr_v != None): #use Schmidl * Cox method Bg_max=0 # g here is 2g in the paper for g in range(self.max_neg_carr_offset,self.max_pos_carr_offset+1,2): tmp=0+0j for k in range(self.fft_len): if(self.corr_v[k]!=0): tmp+=d_sync_sym1[k+g].conjugate() * self.corr_v[k].conjugate() * d_sync_sym2[k+g] if(abs(tmp)>Bg_max): Bg_max=abs(tmp) carr_offset=g else: #Correlate for i in range(self.fft_len-2): self.new_symbol_diffs[i]=abs(d_sync_sym1[i]-d_sync_sym1[i+2]) sum_corr=0; max_corr=0 for g in range(self.max_neg_carr_offset,self.max_pos_carr_offset+1,2): sum_corr=0 for j in range(self.fft_len): if(self.known_symbol_diffs[j]): sum_corr+=(self.known_symbol_diffs[j]*self.new_symbol_diffs[j+g]) if(sum_corr>max_corr): max_corr=sum_corr carr_offset=g print "carrier offset:", carr_offset return carr_offset def get_chan_taps(self,d_sync_sym1,d_sync_sym2,carr_offset,taps): sym=d_sync_sym2 if self.n_sync_syms==2 else d_sync_sym1 loop_start=0 loop_end=self.fft_len if(carr_offset > 0): loop_start=carr_offset elif(carr_offset<0): loop_end=self.fft_len+carr_offset for i in range(loop_start,loop_end): if(self.ref_sym[i-carr_offset]!=0): taps[i-carr_offset]=sym[i]/self.ref_sym[i-carr_offset] if(self.interpolate): for i in range(self.first_active_carrier+1,self.last_active_carrier,2): taps[i]=taps[i-1] taps[self.last_active_carrier]=taps[self.last_active_carrier-1] def forecast(self , noutput_items, ninput_items_required): ninput_items_required[0] = (noutput_items/self.n_data_syms) * (self.n_data_syms+self.n_sync_syms) #operate on per frame basis def general_work(self, input_items, output_items): in0=input_items[0] out0=output_items[0] data_buffer=None framesize=self.n_sync_syms+self.n_data_syms if(len(in0)