此文件是读取odps中的表字段,再调用某接口,返回生成的url链接,
原表信息id,content,需要产生的新数据为原id,url
# -*- coding: utf-8 -*-
# @Time : 2021/12/6 9:59
# @Author : llh
# @File : test_api.py
# @Synopsis : 此文件是读取odps中的表字段,再调用某接口,返回生成的url连接
import re
import json
import requests
from odps import ODPS
import pandas as pd
from odps import options
from pandas._libs.internals import defaultdict
##odps的参数,允许大于10000条
options.sql.settings = {"odps.sql.submit.mode": "script"}
options.tunnel.limit_instance_tunnel = True
# odps连接
odps_connect = ODPS('AccessKey ID', 'AccessKey Secret', 'your_Project',
endpoint='http://service.cn-xxxxxxxxxxxxxxxxxxxx/api')
'''
获取最新分区信息,每次都读取最新分区的内容,并布置定时来进行增量任务
原表中的分区结构为ds = yyyymmdd,hh = hours,mm = minutes
此函数需要根据自己表的具体情况去进行分区的拆分
'''
def get_new_partition(odps_connect):
table = odps_gat.get_table('your_table')
list_all_partion = []
pattern = re.compile(r'(\d+)')
if table.schema.partitions:
for partition in table.partitions:
result1 = pattern.findall(partition.name)
partion = ''
for i in result1:
partion = partion + i
list_all_partion.append(partion)
list_all_partion.sort()
if len(list_all_partion) != 0:
list_dm_max_partion = list_all_partion[-1]
#ods最新分区:202112061000
print("ods最新分区:" + list_dm_max_partion)
ds_partion = str(list_dm_max_partion)[0:8]
hh_partion = str(list_dm_max_partion)[8:10]
mm_partion = str(list_dm_max_partion)[10:12]
#20211206
print(ds_partion)
#10
print(hh_partion)
#00
print(mm_partion)
return ds_partion, hh_partion, mm_partion,list_dm_max_partion
# 获取被读取表固定分区的所需字段字段a,b
def odps_vealue(odps_connect, ds, hh):
sql = "select a,b from table_name" \
"where ds = \'{}\' and hh = \'{}\' limit 10".format(ds, hh)
with odps_gat.execute_sql(sql).open_reader(tunnel=True) as reader:
d = defaultdict(list)
for record in reader:
for res in record:
d[res[0]].append(res[1])
datas = pd.DataFrame.from_dict(d, orient='index').T
#如果有多个列,则可以起多个列名,方便后续直接读取
#如果列太多或者不想取列名,也可以用for index, row in df.iterrows()进行遍历读取
datas.columns = ['col_a', 'col_b']
# col_a col_b
# 0 1 a
# 1 2 b
# 2 3 c
# 3 4 d
return datas
# 调用接口,传入从表中读取到的两列
def get_api(a, b):
url = 'http://xxxxxxxxxxxxxxxxxxxxx'
#请求头根据自己接口文档进行调整
headers = {'content-type': 'application/json'}
s = json.dumps({
"接口传的参数,自己调整"
})
#post方法
r = requests.post(url, data=s, headers=headers)
resp_text = r.text
# print(resp_text)
resp_obj = json.loads(resp_text)
#只获取到返回的json串中的data中的url值
data_list = resp_obj["data"]["Url"]
print(data_list)
return [id, data_list]
# 获得'id', 'Url'
# mm暂时用不到,先不取
ds, hh, _, list_dm_max_partion = get_new_partition(odps_connect)
info_df = odps_vealue(odps_connect, ds, hh)
data_list = []
for index, row in info_df.iterrows():
data = get_api(row['col_a'], row['col_b'])
data_list.append(data)
result = pd.DataFrame(data_list, columns=['col_a', 'col_b'])
result_list = result.values.tolist()
#[['1','a'],['2','b'],['3','c'],['4','d']]
print(result_list)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108