# ETL

# 导入

通过如下方式导入etl api

import etl from "svr-api/etl";

# 方法

export interface TransformGISLocationsArgs {
	/**
	 * 坐标查询服务提供商,如:`amap`, `bmap`,默认 `amap`,即高德。
	 */
	server?: string;

	/**
	 * 设置是否是调试状态。默认false。
	 * 
	 * 调试状态时:
	 * 1. 会通过脚本执行日志输出运行信息,方便查找问题
	 */
	debug: boolean;

	/**
	 * 并发线程数,可以多线程并发查询坐标,默认3。
	 * 
	 * 由于大量高并发请求会被高德识别为抓取数据,导致key无法正常使用,
	 * 如果查询的数据量不大,且对查询速度没有要求,建议不要将该参数设置太大,使用默认值就行
	 */
	threadsCount?: number;

	/**
	 * 每个地址尝试查询的次数,default is 8
	 */
	tryTimes?: number;

	/**
	 * 项目名
	 */
	projectName: string;

	/**
	 * 记录要更新地址的单位信息,元数据路径id或数据库表信息对象
	 */
	srcTable: string | DbTableInfo;

	/**
	 * 只更新满足条件的数据。
	 * 
	 * 可以是一个字符串,如`"FIELD1=123 AND FIELD2='abc'"`,也可以是一个json,如`{"FIELD1":123, "FIELD2":"abc"}`
	 */
	filter?: string;

	/**
	 * 从srcTable中最多查询多少行数据?
	 * 
	 * 有时候为了测试,希望只检查部分数据,所以可以传递一个最多处理的数据行数,比如1000,默认查询所有满足条件的数据
	 */
	limit?: number;

	/**
	 * 单次查询最多查多少条数据。
	 *
	 * 最大值为40w,未设置时默认取最大值。
	 * 
	 * `GIS`查询时是一个地区查询一次,单次查询上限40w,并发环境下,内存最大数据
	 * 量可达40w*n条,n为线程数,可能出现内存溢出,可通过设置此参数减少最大数据
	 * 条数,从而避免溢出。
	 */
	pageSize?: number;

	/**
	 * 指定一个行政区划代码,作为默认的搜索范围。
	 * 
	 * gis服务器上进行地址编码搜索需要能设置一个行政区划作为搜索的范围,通常数据中应该有行政区划,
	 * 如果数据中没有行政区划字段,那么将使用这个范围进行检索,搜索的所有地址都应该是这个行政区划范围内的。
	 * 
	 * FIXME 先只支持直接传中文
	 */
	defaultAdCode?: string;

	/**
	 * 定义源表中的字段名约定
	 */
	srcTableFields: {
		/**
		 * 数据行唯一标示字段,必须存在
		 */
		id: string;
		/**
		 * 行政区划字段,可选。
		 * 
		 * 1. 如果存在,那么将根据行政区划分区并发进行地址检索
		 * 2. 如果不存在,那么搜索地址时将使用`defaultAdCode`参数所指定的值,
		 */
		adCode?: string;

		/**
		 * 地址 必须存在。
		 * 
		 * 当根据第一个地址搜索找不到准确的经纬度时,将尝试使用后续的地址,尝试次数(包括因为网络异常导致的尝试),不能超过`tryTimes`的设置
		 */
		address: string;
		/**
		 * 地址2,可选
		 */
		address2?: string;
		/**
		 * 地址3,可选
		 */
		address3?: string;
		/**
		 * 地址4,可选
		 */
		address4?: string;
	}

	/**
	 * 地址校订表信息。
	 * 
	 * 有些地址在高德是查询不到准确地点的(比如“武汉市东湖新技术开发区关南科技工业园1栋”,只能定位到“武汉市”,必须去掉“科技”二字才行),
	 * 考虑到很多地址其实高德都收录了,但是高德的地址的名称可能和我们数据中的不一致,所以提供revise参数用于在查询地址前把地址先人工“校订”一
	 * 下,在检索每个地址前都会根据这里的“校订”表进行地址校订后再到gis服务器进行查询。
	 */
	addressRevises?: Array<{
		/**
		 * 一个匹配字符串,如"关南科技工业园",或"关南*工业园*",星号表示通配符,地址中被匹配的字符串将会替换为replace指定的字符串。
		 */
		match: string;
		/**
		 * 地址中被匹配的字符串将会替换为replace指定的字符串。
		 */
		replace: string;
	}>;

	/**
	 * 将搜索到的经纬度或行政区划写到指定的目标表。目标表可以和源表是同一个表,同一个表时会先写入到临时表,再更新到源表。
	 * FIXME 暂时只支持更新到原表,本参数有值则更新原表,无值则不更新。
	 */
	targetTable?: string;

	/**
	 * 目标表的约定字段名。
	 */
	targetTableFields: {
		/**
		 * 数据行唯一标示字段,必须存在
		 */
		id: string;
		/**
		 * 如果指定了行政区划字段,那么将会把根据地址搜索到的行政区划写入到目标字段。
		 * 
		 * 有些时候源表中只有一个地址,但是不知道所在的行政区划,所以这里希望通过gis服务器的地址编码能力找到行政区划。
		 */
		adCode?: string;
		/**
		 * 经度字段,存在则写入,如果字段是整型,则会自动转换为整型后写入
		 */
		lng?: string;
		/**
		 * 维度字段,存在则写入,如果字段是整型,则会自动转换为整型后写入
		 */
		lat?: string;

		/**
		 * 聚集信息,当需要聚集经纬度时传入,可以将113.977828,30.749883的精确值聚集到113.977000,30.749000,便于热力图查询
		 * 写入聚集后的经度字段,如果字段是整型,则会自动转换为整型后写入
		 */
		aggLng?: string;

		/**
		 * 写入聚集后的纬度字段,如果字段是整型,则会自动转换为整型后写入
		 */
		aggLat?: string;

		/**
		 * 目标表中需要计算地址所在区域的字段。如计算企业所在的自贸区、计算企业是否在长江沿岸3公里内……。
		 * 
		 * 可以指定多个字段,字段必须对应相应的地理角色、地理角色必须关联正确的地图,系统将取得地图的区块的边界进行判断,将地址所在的
		 * 区块对应的地理角色的维项ID写入到字段中。
		 */
		zones?: Array<string>;
	}

	/**
	 * 聚集米数,如按照100米精度进行聚集
	 */
	aggMeters?: number;

	/**
	 * 查询结果偏差(和district参数比较)的容忍度
	 * @deprecated 需要改为{@link #acceptAdCodeResult}和{@link #acceptDiffAdCode}
	 */
	confidence?: number;

	/**
	 * 标记是否接受搜索的地址结果是一个行政区划地址。判断依据是gis返回结果的精确程度。
	 * 
	 * 使用gis服务器的地址编码搜索时可能有些地址高德服务器是搜索不到的,此时它会直接返回所在的城市、省、区。此时可以设置:
	 * 1. no 默认,不接受行政区划地址结果 
	 * 2. county 接受县区结果
	 * 3. city 接受市结果
	 * 4. all 接受所有结果
	 */
	acceptAdCodeResult: "all" | "city" | "county" | "no";

	/**
	 * 是否接受搜索结果和源数据的行政区划不一致。判断依据是gis结果和当前数据的差异程度。
	 * 
	 * 当源数据中有行政区划信息,但是搜索的结果所在的行政区划和源数据库中的不一致时也可能时错误的。此时用户可以设置:
	 * 1. no 默认,不接受搜索结果所在的行政区划和源数据的不一致的结果
	 * 2. province 接受同省但不同市的j结果
	 * 3. city 接受同市不同县区的结果
	 * 4. all 接受所有结果
	 * 
	 * 与{@link #acceptAdCodeResult}的区别是,对于一个地址的查询:
	 * 
	 * 1. 结果就是一个城市或区,比如武汉市江汉区,但这其实是不对的,因为我的地址不是区,而是一个具体的地址,{@link #acceptAdCodeResult}
	 * 2. 查询的结果可能是一个准确地址,但是高德告诉我,这个地址在别的城市或者区,{@link #acceptDiffAdCode}
	 */
	acceptDiffAdCode: "all" | "province" | "city" | "no";

	/**
	 * 记录搜索不到经纬度的数据信息,方便用户排查问题。 地址不准确有两个判断条件:
	 * 
	 * 1. 有些地址高德服务器是搜索不到的,此时它会直接返回所在的城市、省、区。通过参数`acceptAdCodeResult`,如果不接受行政区划结果,那么就是错误的结果。
	 * 2. 当源数据中有行政区划信息,但是搜索的结果所在的行政区划和源数据库中的不一致时也可能时错误的。通过参数`acceptDiffAdCode`控制,不接受的结果就是错误的结果。
	 * 
	 * 不准确的(错误的)结果不会写入到目标表(`targetTable`),但是可以写入到`errTable`表以便排查问题。
	 */
	errTable?: string;

	/**
	 * TODO 待实现,errTable的结构目前和临时表结构一致,并新增一个err_location字段
	 */
	errTableFields?: {
		/**
		 * 数据行唯一标示字段,必须存在
		 */
		id: string;
		/**
		 * 源表的行政区划字段
		 */
		adCode?: string;
		/**
		 * 搜索用的地址
		 */
		address: string;
		/**
		 * 校订后的地址
		 */
		revised_address: string;
		/**
		 * 结果地址
		 */
		err_location: string;
		/**
		 * 结果行政区划
		 */
		err_adCode?: string;
		/**
		 * 结果经度
		 */
		err_lng: string;
		/**
		 * 结果纬度
		 */
		err_lat: string;
	}
}

/**
 * 从指定的数据库表中查询出文本地址信息,并调用互联网上的经纬度坐标查询服务,获取经纬度后再写入表中的经纬度字段中。
 * 
 * 1. 支持多线程并发访问公共地图地址编码接口,快速查询海量数据的坐标信息。
 * 2. 查询过程中将耗费大量的cpu和内存资源,建议JVM最大内存设置在4G以上。
 * 
 * @param args 
 */
export function transformGISLocations(args: TransformGISLocationsArgs): void;

export interface TransformLocationsArgs {
	addresses: Array<{
		address1: string,
		address2?: string,
		address3?: string,
	}>;
}

/**
 * 根据传入的数组中的地址信息,查询经纬度并返回。此函数适用于小量数据的查询。
 * 
 * @param args 
 */
export function transformLocations(args: TransformLocationsArgs): Array<{ address: string, lng: number, lat: number }>;

export interface TransformLocationZonesArgs {
	/**
	 * 记录要更新地区信息的表的元数据对象或者元数据路径id
	 */
	table: string;
	/**
	 * 只更新满足条件的数据。
	 * 
	 * 可以是一个字符串,如`"FIELD1=123 AND FIELD2='abc'"`,也可以是一个json,如`{"FIELD1":123, "FIELD2":"abc"}`
	 */
	filter?: string;

	/**
	 * 从srcTable中最多查询多少行数据?
	 * 
	 * 有时候为了测试,希望只检查部分数据,所以可以传递一个最多处理的数据行数,比如1000,默认查询所有满足条件的数据
	 */
	limit?: number;

	/**
	 * 目标表的约定字段名。
	 */
	tableFields: {
		/**
		 * 数据行唯一标示字段,必须存在
		 */
		id: string;
		/**
		 * 经度字段,必须存在,且必须已经有正确的经纬度信息了
		 */
		lng: string;
		/**
		 * 维度字段,必须存在,且必须已经有正确的经纬度信息了
		 */
		lat: string;

		/**
		 * 目标表中需要计算地址所在区域的字段。如计算企业所在的自贸区、计算企业是否在长江沿岸3公里内……。
		 * 
		 * 可以指定多个字段,字段必须对应相应的地理角色、地理角色必须关联正确的地图,系统将取得地图的区块的边界进行判断,将地址所在的
		 * 区块对应的地理角色的维项ID写入到字段中。
		 */
		zones: Array<string>;
	}

}

/**
 * 根据已有的经纬度信息计算经纬度所在的地区并将计算的地区信息更新到表中。
 * 
 * 如计算企业所在的自贸区、计算企业是否在长江沿岸3公里内……。
 */
export function transformLocationZones(args: TransformLocationZonesArgs): void;

/**
 * 在数据源之间迁移数据库表
 * @param args 
 */
export function copyTables(args: CopyTablesArgs): void;

/**
 * 立即执行一个数据加工提取
 * @param resIdOrPath 资源ID或路径 
 * @returns 返回一个json字符串,里面存放etl执行的日志,格式为{logs:[]}
 */
export function runETL(resIdOrPath: string, params?: JSONObject): string;

/**
 * 立即执行计划调度,同步执行
 * @param id 计划ID
 */
export function runSchedule(id: string): void;

/**
 * 立即执行计划调度,异步执行
 * @param id 计划ID
 */
export function runScheduleAsync(id: string): void;

/**
 * 初始化系统日期维的数据服务。
 * 用于系统日期维的初始化数据的脚本调用。
 * @param start 日期起,例:20000101
 * @param end 日期止,例:20301231
 * @param defineFestivals 字符串数组,自定义节假日,通过客户端脚本传递。
 *        数组格式:["20210926 班","20211001*国庆"] 带*表示假日,不带表示上班调休。 
 */
export function initSysDateDim(start: string, end: string, defineFestivals: string[]): void;

# 对象

/**
 * 此文件定义一些数据加工相关的数据类型。
 */

declare type IDataFlowDbTableName = string | { datasourceName?: string, schema?: string, name: string };

declare type IDataFlowDataSetArray = Array<IDataFlowDataRow>;

declare interface IDataFlowDataRow {
	[fieldName: string]: any;
}

/**
 * 传入给脚本加工节点的参数。通过此参数可以获取和设置加工过程中的所有上下文信息。
 */
declare interface IDataFlowScriptNodeContext {
	/**
	 * 是否为预览数据
	 */
	isPreview: boolean;

	/**
	 * 从前端传进来的原始查询query信息
	 */
	queryInfo: QueryInfo
	
	/**
	 * 获取脚本输入节点的个数
	 */
	getInputCount(): number;
	/**
	 * 获取脚本输入节点的信息
	 * @param index 
	 */
	getInput(index?: number): FlowScriptNodeInputData;

	/**
	 * 获取脚本输出节点的信息,可通过这个对象设置脚本节点输出信息
	 */
	getOutput(): FlowScriptNodeOutputData;
	/**
	 * 获取脚本节点的属性信息
	 */
	getNodeInfo(): DataFlowNodeInfo;
}

/**
 * 脚本输入节点的信息
 */
declare interface FlowScriptNodeInputData {

	/**
	 * 获取节点属性信息。
	 */
	getNodeInfo(): DataFlowNodeInfo;

	/**
	 * 获取输入字段结构。
	 */
	getFields(): Array<DbFieldInfo>;

	/**
	 * 获取前序节点查询sql,若是流式节点,则会返回落地到临时表后的查询sql
	 */
	getSql(): string;

	/**
	 * 返回是否有下一行数据,调用后会开启流式查询。
	 */
	hasNext(): boolean;

	/**
	 * 获取下一行数据
	 */
	nextRow(): Array<any>;

	/**
	 * 返回节点数据源
	 */
	getDataSouce(): string;
}

/**
 * 脚本输出节点信息,可通过该对象设置加工过程中的所有上下文信息
 */
declare interface FlowScriptNodeOutputData {

	/**
	 * 设置脚本输出的字段结构,若不设置,则认为节点没有输出数据。(字段类型默认为字符型,字段长度默认为50,浮点型字段的小数点位数默认为2)
	 * @param fields
	 */
	setFields(fields: Array<DbFieldInfo>): void;

	/**
	 * 添加一批字段(字段类型默认为字符型,字段长度默认为50,浮点型字段的小数点位数默认为2)
	 * @param fields
	 */
	addFields(fields: Array<DbFieldInfo>): void;

	/**
	 * 添加一个字段,字段类型默认为字符型,字段长度默认为50,浮点型字段的小数点位数默认为2
	 * @param field 
	 * @param index 默认从0开始,若传入值超过字段个数,则默认放到字段末尾
	 */
	addField(field: DbFieldInfo, index?: number): DbFieldInfo;

	/**
	 * 添加一个字段
	 */
	addField(name: string, dataType?: FieldDataType, length?: number, decimal?: number, dbfield?: string): void;

	/**
	 * 移除一个字段
	 * @param fieldName 
	 */
	removeField(fieldName: string): DbFieldInfo;
	/**
	 * 获取所有字段信息
	 */
	getFields(): Array<DbFieldInfo>;

	/**
	 * 设置物理表信息,后序节点会在此基础上查询
	 * @param ds 数据源
	 * @param schema 模式
	 * @param name 表名
	 */
	setDbTableInfo(ds: string, schema: string, name: string): void;

	/**
	 * 设置sql,后序节点会在此基础上查询。
	 * @param sql 
	 * @param ds 数据源
	 * @param schema 模式
	 */
	setSql(sql: string, ds: string, scheme: string): void;

	/**
	 * 输出一条数据给后序节点。
	 * @param row 
	 */
	writeRow(row: Array<any>): void;
}
是否有帮助?
0条评论
评论