#!/usr/bin/env python3 #need python3 # It is the slowest process: check ncfile is NETCDF4 or not # usage: ./is_nc4.py NS9039K_filelist_20191119T1528.txt.gz # output: nc4files.txt # contains: nc4filename.nc # other fileformat will not be include. import sys,io,time,os import re from datetime import timedelta import subprocess as sp # dusage command from netCDF4 import Dataset import gzip import gc # free memory from multiprocessing import Pool from multiprocessing.context import TimeoutError def check_nc_netcdf4(fn): isnc4 = False if not fn.endswith('.nc'): return False try: a = Dataset(fn) ## seg.error for some files, maybe libnetcdf problem if 'NETCDF4' in a.data_model: isnc4 = True except: isnc4 = 'ERR' return isnc4 def check_nc_netcdf4_Pool4(fns): nctypes = list() if not fns: return nctypes with Pool(4) as p: ps = list() for i in fns: ps.append(p.apply_async(check_nc_netcdf4,(i,))) for i,j in zip(ps,fns): try: a = i.get(timeout=3) nctypes.append(a) except TimeoutError: ## very bad nc file nctypes.append('ERR') #print(j) return nctypes if __name__ == "__main__": nc4dbfn = 'nc4files.txt' nc3dbfn = 'nc3files.txt' errdbfn = 'err_ncfiles.txt' lastlinefn = 'last_line_no.txt' total = 0 lineNums = False start = time.time() if len(sys.argv) > 1: ## .txt.gz file list gzfilelist = sys.argv[1] print('reading file: '+gzfilelist,end='',flush=True) filelist = gzip.open(gzfilelist,'rt',encoding='utf-8',errors='replace').readlines() end = time.time() print(' in %2.2f secs'%(end-start),flush=True) total = len(filelist) if len(sys.argv) == 4: # line number of start and end lineStart = sys.argv[2] lineEnd = sys.argv[3] lineNums = True else: filelist = io.TextIOWrapper(sys.stdin.buffer, encoding='utf-8',errors='replace') ## for non-ascii file name with open(nc4dbfn,"r",encoding='utf-8') as f: #nc4db = set(f.readlines()) nc4db = set(f) with open(nc3dbfn,"r",encoding='utf-8') as f: #nc3db = set(f.readlines()) nc3db = set(f) with open(errdbfn,"r",encoding='utf-8') as f: #errdb = set(f.readlines()) errdb = set(f) ## read last check progress if lineNums: lastLineNo = 0 else: try: with open(lastlinefn,"r",encoding='utf-8') as f: lastLineNo = int(f.read().rstrip()) print(f"start from line {lastLineNo}") except: lastLineNo = 0 ## looping with lines counter = 0 dt = 0 startall = time.time() tmp_nc4files = list() tmp_nc3files = list() tmp_errfiles = list() waittocheck = list() if type(filelist) == type(list()): totalline = len(filelist) else: totalline = 0 processedFile = 0 batchn = 5000 for line in filelist: counter += 1 if counter < lastLineNo: continue if counter%batchn == 0 and len(waittocheck) > 0: # write to files print(f'checking {counter}/{totalline} {len(waittocheck)} files', end='') ctbegin = time.time() nctypes = check_nc_netcdf4_Pool4(waittocheck) ## True: nc4, False: nc3, 'ERR': error file ctend = time.time() ## cal est if totalline: estFile = totalline-counter processedFile += batchn # len(waittocheck) ## roughly... very roughly dtall = ctend - startall if processedFile == 0: estSec = 0 else: estSec = estFile*dtall // processedFile ## // : only int part else: estSec = -1 print(f' done in {ctend-ctbegin:.2f}', end='') for i in range(len(waittocheck)): if nctypes[i] == True: tmp_nc4files.append(waittocheck[i]) if nctypes[i] == False: tmp_nc3files.append(waittocheck[i]) if nctypes[i] == 'ERR': tmp_errfiles.append(waittocheck[i]) print(f' nc4:{len(tmp_nc4files)}, nc3:{len(tmp_nc3files)}, err:{len(tmp_errfiles)}, estSec: {str(timedelta(seconds=estSec))}') if tmp_nc4files: with open(nc4dbfn,'a') as f: f.write('\n'+'\n'.join(tmp_nc4files)) if tmp_nc3files: with open(nc3dbfn,'a') as f: f.write('\n'+'\n'.join(tmp_nc3files)) if tmp_errfiles: with open(errdbfn,'a') as f: f.write('\n'+'\n'.join(tmp_errfiles)) tmp_nc4files = [] tmp_nc3files = [] tmp_errfiles = [] waittocheck = [] nctypes = [] # timing end = time.time() dt = end - start start = end #gc.collect() # since the bug of netcdf4, it consume large memory # so stop after 20000 lines #if counter >= lastLineNo+60000: break elif counter%batchn == 0 : # print progress only print(f'checking {counter}/{totalline} {len(waittocheck)} files', end='') ctbegin = time.time() nctypes = check_nc_netcdf4_Pool4(waittocheck) ## True: nc4, False: nc3, 'ERR': error file ctend = time.time() ## cal est if totalline: estFile = totalline-counter processedFile += batchn # len(waittocheck) ## roughly... very roughly dtall = ctend - startall if processedFile == 0: estSec = 0 else: estSec = estFile*dtall // processedFile ## // : only int part else: estSec = -1 print(f' done in {ctend-ctbegin:.2f}', end='') if counter%batchn == 0: # save progress with open(lastlinefn,"w",encoding='utf-8') as f: f.write(str(counter)) if lineNums: if counter < lineStart: continue if counter > lineEnd: break try: user, size, hlink, fn = tuple(line.split(maxsplit=3)) except ValueError: #print('ignore parse error at line: %d' %counter) #continue fn = line size = '1' if not size.isdigit(): print('ignore parse error at line: %d' %counter) continue if size == '0': continue fn = fn.rstrip() try: if not os.path.exists(fn): continue # file not found except: # if bad fn with open('isnc4.badfilename.txt','a',encoding='utf-8') as f: f.write(fn) f.write('\n') continue if fn.endswith('.nc') and (not fn+'\n' in nc4db) and (not fn+'\n' in nc3db) and (not fn+'\n' in errdb): waittocheck.append(fn) endall = time.time() print('\n Process done in %2.2f secs'%(endall-startall),flush=True)