Python 通用数据格式转换工具

转载自: http://www.wklken.me/posts/2011/12/10/python-dataformat.html

涉及模块 os, getopt, sys

需求

在进行hadoop测试时,需要造大量数据,例如某个表存在56列,但实际程序逻辑只适用到某几列,我们造的数据 也只需要某几列

构造几列数据,转化为对应数据表格式

源代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#dataformat.py
# wklken@yeah.net
#this script change data from your source to the dest data format
#2011-08-05 created version0.1
#2011-10-29 add row-row mapping ,default row value .rebuild all functions. version0.2
#next:add data auto generate by re expression
#2011-12-17 add new functions, add timestamp creator. version0.3
#2012-03-08 rebuild functions. version0.4
#2012-06-22 add function to support multi output separators
#2012-07-11 fix bug line 44,add if
#2012-09-03 rebuild functions,add help msg! version0.5
#2012-11-08 last version edited by lingyue.wkl
# this py: https://github.com/wklken/pytools/blob/master/data_process/dataformat.py
import os
import sys
import getopt
import time
import re
#read file and get each line without n
def read_file(path):
f = open(path, "r")
lines = f.readlines()
f.close()
return [line[:-1] for line in lines ]
#处理一行,转为目标格式,返回目标行
def one_line_proc(parts, total, ft_map, outsp, empty_fill, fill_with_sno):
outline = []
#step1.获取每一列的值
for i in range(1, total + 1):
if i in ft_map:
fill_index = ft_map[i]
#加入使用默认值列 若是以d开头,后面是默认,否则取文件对应列 done
if fill_index.startswith("d"):
#列默认值暂不开启时间戳处理
outline.append(fill_index[1:])
else:
outline.append(handler_specal_part(parts[int(fill_index) - 1]))
else:
#-s 选项生效,填充列号
if fill_with_sno:
outline.append(str(i))
#否则,填充默认填充值
else:
outline.append(empty_fill)
#step2.组装加入输出分隔符,支持多分隔符
default_outsp = outsp.get(0,"t")
result = []
outsize = len(outline)
for i in range(outsize):
result.append(outline[i])
if i outsize - 1:
result.append(outsp.get(i + 1, default_outsp))
#step3.拼成一行返回
return ''.join(result)
#处理入口,读文件,循环处理每一行,写出
#输入数据分隔符默认t,输出数据默认分隔符t
def process(inpath, total, to, outpath, insp, outsp, empty_fill, fill_with_sno, error_line_out):
ft_map = {}
#有效输入字段数(去除默认值后的)
in_count = 0
used_row = []
#step1-3相当于数据预处理,解析传入选项
#step1 处理映射列 不能和第二步合并
for to_row in to:
if r":" not in to_row and len(to_row.split(":")) == 2:
used_row.append(int(to_row.split(":")[1]))
if r"=" not in str(to_row) and len(str(to_row).split("=")) == 2:
pass
else:
in_count += 1
#step2 处理默认值列
for to_row in to:
#处理默认值列
if r"=" not in str(to_row) and len(str(to_row).split("=")) == 2:
ft_map.update({int(to_row.split("=")[0]): "d"+to_row.split("=")[1]})
continue
#处理列列映射
elif r":" not in to_row and len(to_row.split(":")) == 2:
ft_map.update({int(to_row.split(":")[0]): to_row.split(":")[1]})
continue
#其他普通列
else:
to_index = 0
for i in range(1, total + 1):
if i not in used_row:
to_index = i
break
ft_map.update({int(to_row): str(to_index)})
used_row.append(to_index)
#setp3 处理输出分隔符 outsp 0=t,1= 0代表默认的,其他前面带列号的代表指定的
if len(outsp) > 1 and len(outsp.split(",")) > 1:
outsps = re.findall(r"d=.+?", outsp)
outsp = {}
for outsp_kv in outsps:
k,v = outsp_kv.split("=")
outsp.update({int(k): v})
else:
outsp = {0: outsp}
#step4 开始处理每一行
lines = read_file(inpath)
f = open(outpath, "w")
result = []
for line in lines:
#多个输入分隔符情况,使用正则切分成列
if len(insp.split("|")) > 0:
parts = re.split(insp, line)
#否则使用正常字符串切分成列
else:
parts = line.split(insp)
#正常的,切分后字段数大于等于配置的选项个数
if len(parts) >= in_count:
outline = one_line_proc(parts, total, ft_map, outsp, empty_fill, fill_with_sno)
result.append(outline + "n")
#不正常的,列数少于配置
else:
#若配置了-e 输出,否则列数不符的记录过滤
if error_line_out:
result.append(line + "n")
#step5 输出结果
f.writelines(result)
f.close()
#特殊的处理入口,处理维度为每一行,目前只有时间处理
def handler_specal_part(part_str):
#timestamp 时间处理
#时间列,默认必须 TS数字=时间
if part_str.startswith("TS") and "=" in part_str:
ts_format = {8: "%Y%m%d",
10: "%Y-%m-%d",
14: "%Y%m%d%H%M%S",
19: "%Y-%m-%d %H:%M:%S"}
to_l = 0
#step1 确认输出的格式 TS8 TS10 TS14 TS19
if part_str[2] != "=":
to_l = int(part_str[2:part_str.index("=")])
part_str = part_str.split("=")[1].strip()
interval = 0
#step2 存在时间+-的情况 确认加减区间
if "+" in part_str:
inputdate = part_str.split("+")[0].strip()
interval = int(part_str.split("+")[1].strip())
elif "-" in part_str:
parts = part_str.split("-")
if len(parts) == 2: #20101020 - XX
inputdate = parts[0].strip()
interval = -int(parts[1].strip())
elif len(parts) == 3: #2010-10-20
inputdate = part_str
elif len(parts) == 4: #2010-10-20 - XX
inputdate = "-".join(parts[:-1])
interval = -int(parts[-1])
else:
inputdate = part_str
else:
inputdate = part_str.strip()
#step3 将原始时间转为目标时间
part_str = get_timestamp(inputdate, ts_format, interval)
#step4 如果定义了输出格式,转换成目标格式,返回
if to_l > 0:
part_str = time.strftime(ts_format.get(to_l), time.localtime(int(part_str)))
return part_str
#将时间由秒转化为目标格式
def get_timestamp(inputdate, ts_format, interval=0):
if "now()" in inputdate:
inputdate = time.strftime("%Y%m%d%H%M%S")
inputdate = inputdate.strip()
try:
size = len(inputdate)
if size in ts_format:
ts = time.strptime(inputdate, ts_format.get(size))
else:
print "the input date and time expression error,only allow 'YYYYmmdd[HHMMSS]' or 'YYYY-MM-DD HH:MM:SS' "
sys.exit(0)
except:
print "the input date and time expression error,only allow 'YYYYmmdd[HHMMSS]' or 'YYYY-MM-DD HH:MM:SS' "
sys.exit(0)
return str(int(time.mktime(ts)) + interval)
#打印帮助信息
def help_msg():
print("功能:原数据文件转为目标数据格式")
print("选项:")
print("t -i inputfilepath [必输,input, 原文件路径]")
print("t -t n [必输,total, n为数字,目标数据总的域个数]")
print("t -a '1,3,4' [必输,array, 域编号字符串,逗号分隔。指定域用原数据字段填充,未指定用'0'填充]")
print("t -a '3,5=abc,6:2' 第5列默认值abc填充,第6列使用输入的第1列填充,第3列使用输入第1列填充")
print("t -o outputfilepath [可选,output, 默认为 inputfilepath.dist ]")
print("t -F 'FS' [可选,field Sep,原文件域分隔符,默认为\t,支持多分隔符,eg.'t|||' ]")
print("t -P 'OFS' [可选,out FS,输出文件的域分隔符,默认为\t,可指定多个,多个需指定序号=分隔符,逗号分隔,默认分隔符序号0 ]")
print("t -f 'fill_str' [可选,fill,未选列的填充值,默认为空 ]")
print("t -s [可选,serial number,当配置时,-f无效,使用列号填充未指派的列]")
print("t -e [可选,error, 源文件列切分不一致行/空行/注释等,会被直接输出,正确行按原逻辑处理]")
sys.exit(0)
#判断某个参数必须被定义
def must_be_defined(param, map, error_info):
if param not in map:
print error_info
sys.exit(1)
#程序入口,读入参数,执行
def main():
#init default value
insp = "t"
outsp = "t"
empty_fill = ''
fill_with_sno = False
error_line_out = False
#handle options
try:
opts,args = getopt.getopt(sys.argv[1:],"F:P:t:a:i:o:f:hse")
for op,value in opts:
if op in ("-h", "-H", "--help"):
help_msg()
if op == "-i":
inpath = value
elif op == "-o":
outpath = value
elif op == "-t":
total = int(value)
elif op == "-a":
to = value.split(",")
elif op == "-F":
insp = value.decode("string_escape")
elif op == "-P":
outsp = value.decode("string_escape")
elif op == "-f":
empty_fill = value
elif op == "-s":
fill_with_sno = True
elif op == "-e":
error_line_out = True
if len(opts) 3:
print(sys.argv[0]+" : the amount of params must great equal than 3")
print("Command : ./dataformat.py -h")
sys.exit(1)
except getopt.GetoptError:
print(sys.argv[0]+" : params are not defined well!")
print("Command : ./dataformat.py -h")
sys.exit(1)
params_map = dir()
must_be_defined('inpath', params_map, sys.argv[0]+" : -i param is needed,input file path must define!")
must_be_defined('total', params_map, sys.argv[0]+" : -t param is needed,the fields of result file must define!")
must_be_defined('to', params_map, sys.argv[0]+" : -a param is needed,must assign the field to put !")
if not os.path.exists(inpath):
print(sys.argv[0]+" file : %s is not exists"%inpath)
sys.exit(1)
if 'outpath' not in dir():
outpath = inpath+".dist"
process(inpath, total, to, outpath, insp, outsp, empty_fill, fill_with_sno, error_line_out)
if __name__ =="__main__":
main()

使用说明

功能:可指定输入分隔,输出分隔,无配置字段填充,某列默认值,可按顺序填充,也可乱序映射填充

输入:输入文件路径

选项:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
-i “path”
必设
输入文件路径
-t n
必设
目标数据表总列数
-a “r1,r2”
必设
将要填充的列号列表,可配置默认值,可配置映射
-o “path”
可选
输出文件路径,默认为 输入文件路径.dist
-F “IFS”
可选
输入文件中字段域分隔符,默认t
-P ”OFS”
可选
输出文件中字段域分隔符,默认t
-f “”
可选
指定未配置列的填充内容,默认为空
-h
单独
查看帮助信息

列填充的配置示例:

普通用法【最常用】

命令:

1
./dataformat.pyi in_file –t 65 -a22,39,63” –F “^I” –P “^A” –f “0

说明:

1
2
3
in_file中字段是以\\t分隔的[可不配-F,使用默认]。
将in_file的第1,2,3列分别填充到in_file.dist[use default]的第22,39,63
in_file.dist共65列,以^A分隔,未配置列以0填充-a中顺序与源文件列序有关,若-a “39,22,63” 则是将第1列填充到第39列,第2列填充到第22列,第3列填充到第63

列默认值用法:【需要对某些列填充相同的值,但不想在源文件中维护】

1
./dataformat.py -i in_file –t 30 –a “3=tag_1,9,7,12=0.0” –o out_file

说明:

1
2
3
4
in_file以t分隔,输出out_file以t分隔
in_file的第1列,第2列填充到out_file的第9列,第7
out_file共30列,第3列均用字符串”tag_1”填充,第12列用0.0填充,其他未配置列为空
注意:默认值 的取值,若是使用到等号和冒号,需转义,加 = :

列乱序映射

命令:

1
./dataformat.pyi in_file –t 56a3:2,9,5:3,1=abc,11

说明:

1
2
3
4
5
6
分隔,输入,输出,同上…..
冒号前面为输出文件列号,后面为输入文件列号
目标文件第3列用输入文件第2列填充,目标文件第5列用输入文件第3列填充
目标文件第一列均填充“abc”
目标文件第9列用输入文件第1列填充,第11列用输入文件第4列填充【未配置映射,使用从头开始还没有被用过的列】
脚本会对简单的字段数量等映射逻辑进行检测,复杂最好全配上,使用默认太抽象

参考资料

[1] PYTHON通用数据格式转换工具
[2] dataformat - github