# ETL

# 导入

通过如下方式导入etl api

import etl from "svr-api/etl";

# 方法

export interface TransformGISLocationsArgs {
	/**
	 * 坐标查询服务提供商,如:`amap`, `bmap`,默认 `amap`,即高德。
	 */
	server?: string;

	/**
	 * 设置是否是调试状态。默认false。
	 * 
	 * 调试状态时:
	 * 1. 会通过脚本执行日志输出运行信息,方便查找问题
	 */
	debug: boolean;

	/**
	 * 并发线程数,可以多线程并发查询坐标,默认3。
	 * 
	 * 由于大量高并发请求会被高德识别为抓取数据,导致key无法正常使用,
	 * 如果查询的数据量不大,且对查询速度没有要求,建议不要将该参数设置太大,使用默认值就行
	 */
	threadsCount?: number;

	/**
	 * 每个地址尝试查询的次数,default is 8
	 */
	tryTimes?: number;

	/**
	 * 项目名
	 */
	projectName: string;

	/**
	 * 记录要更新地址的单位信息,元数据路径id或数据库表信息对象
	 */
	srcTable: string | DbTableMeta;

	/**
	 * 只更新满足条件的数据。
	 * 
	 * 可以是一个字符串,如`"FIELD1=123 AND FIELD2='abc'"`,也可以是一个json,如`{"FIELD1":123, "FIELD2":"abc"}`
	 */
	filter?: string;

	/**
	 * 从srcTable中最多查询多少行数据?
	 * 
	 * 有时候为了测试,希望只检查部分数据,所以可以传递一个最多处理的数据行数,比如1000,默认查询所有满足条件的数据
	 */
	limit?: number;

	/**
	 * 指定一个行政区划代码,作为默认的搜索范围。
	 * 
	 * gis服务器上进行地址编码搜索需要能设置一个行政区划作为搜索的范围,通常数据中应该有行政区划,
	 * 如果数据中没有行政区划字段,那么将使用这个范围进行检索,搜索的所有地址都应该是这个行政区划范围内的。
	 * 
	 * FIXME 先只支持直接传中文
	 */
	defaultAdCode?: string;

	/**
	 * 定义源表中的字段名约定
	 */
	srcTableFields: {
		/**
		 * 数据行唯一标示字段,必须存在
		 */
		id: string;
		/**
		 * 行政区划字段,可选。
		 * 
		 * 1. 如果存在,那么将根据行政区划分区并发进行地址检索
		 * 2. 如果不存在,那么搜索地址时将使用`defaultAdCode`参数所指定的值,
		 */
		adCode?: string;

		/**
		 * 地址 必须存在。
		 * 
		 * 当根据第一个地址搜索找不到准确的经纬度时,将尝试使用后续的地址,尝试次数(包括因为网络异常导致的尝试),不能超过`tryTimes`的设置
		 */
		address: string;
		/**
		 * 地址2,可选
		 */
		address2?: string;
		/**
		 * 地址3,可选
		 */
		address3?: string;
		/**
		 * 地址4,可选
		 */
		address4?: string;
	}

	/**
	 * 地址校订表信息。
	 * 
	 * 有些地址在高德是查询不到准确地点的(比如“武汉市东湖新技术开发区关南科技工业园1栋”,只能定位到“武汉市”,必须去掉“科技”二字才行),
	 * 考虑到很多地址其实高德都收录了,但是高德的地址的名称可能和我们数据中的不一致,所以提供revise参数用于在查询地址前把地址先人工“校订”一
	 * 下,在检索每个地址前都会根据这里的“校订”表进行地址校订后再到gis服务器进行查询。
	 */
	addressRevises?: Array<{
		/**
		 * 一个匹配字符串,如"关南科技工业园",或"关南*工业园*",星号表示通配符,地址中被匹配的字符串将会替换为replace指定的字符串。
		 */
		match: string;
		/**
		 * 地址中被匹配的字符串将会替换为replace指定的字符串。
		 */
		replace: string;
	}>;

	/**
	 * 将搜索到的经纬度或行政区划写到指定的目标表。目标表可以和源表是同一个表,同一个表时会先写入到临时表,再更新到源表。
	 * FIXME 暂时只支持更新到原表,本参数有值则更新原表,无值则不更新。
	 */
	targetTable?: string;

	/**
	 * 目标表的约定字段名。
	 */
	targetTableFields: {
		/**
		 * 数据行唯一标示字段,必须存在
		 */
		id: string;
		/**
		 * 如果指定了行政区划字段,那么将会把根据地址搜索到的行政区划写入到目标字段。
		 * 
		 * 有些时候源表中只有一个地址,但是不知道所在的行政区划,所以这里希望通过gis服务器的地址编码能力找到行政区划。
		 */
		adCode?: string;
		/**
		 * 经度字段,存在则写入,如果字段是整型,则会自动转换为整型后写入
		 */
		lng?: string;
		/**
		 * 维度字段,存在则写入,如果字段是整型,则会自动转换为整型后写入
		 */
		lat?: string;

		/**
		 * 聚集信息,当需要聚集经纬度时传入,可以将113.977828,30.749883的精确值聚集到113.977000,30.749000,便于热力图查询
		 * 写入聚集后的经度字段,如果字段是整型,则会自动转换为整型后写入
		 */
		aggLng?: string;

		/**
		 * 写入聚集后的纬度字段,如果字段是整型,则会自动转换为整型后写入
		 */
		aggLat?: string;

		/**
		 * 目标表中需要计算地址所在区域的字段。如计算企业所在的自贸区、计算企业是否在长江沿岸3公里内……。
		 * 
		 * 可以指定多个字段,字段必须对应相应的地理角色、地理角色必须关联正确的地图,系统将取得地图的区块的边界进行判断,将地址所在的
		 * 区块对应的地理角色的维项ID写入到字段中。
		 */
		zones?: Array<string>;
	}

	/**
	 * 聚集米数,如按照100米精度进行聚集
	 */
	aggMeters?: number;

	/**
	 * 查询结果偏差(和district参数比较)的容忍度
	 * @deprecated 需要改为{@link #acceptAdCodeResult}和{@link #acceptDiffAdCode}
	 */
	confidence?: number;

	/**
	 * 标记是否接受搜索的地址结果是一个行政区划地址。判断依据是gis返回结果的精确程度。
	 * 
	 * 使用gis服务器的地址编码搜索时可能有些地址高德服务器是搜索不到的,此时它会直接返回所在的城市、省、区。此时可以设置:
	 * 1. no 默认,不接受行政区划地址结果 
	 * 2. county 接受县区结果
	 * 3. city 接受市结果
	 * 4. all 接受所有结果
	 */
	acceptAdCodeResult: "all" | "city" | "county" | "no";

	/**
	 * 是否接受搜索结果和源数据的行政区划不一致。判断依据是gis结果和当前数据的差异程度。
	 * 
	 * 当源数据中有行政区划信息,但是搜索的结果所在的行政区划和源数据库中的不一致时也可能时错误的。此时用户可以设置:
	 * 1. no 默认,不接受搜索结果所在的行政区划和源数据的不一致的结果
	 * 2. province 接受同省但不同市的j结果
	 * 3. city 接受同市不同县区的结果
	 * 4. all 接受所有结果
	 * 
	 * 与{@link #acceptAdCodeResult}的区别是,对于一个地址的查询:
	 * 
	 * 1. 结果就是一个城市或区,比如武汉市江汉区,但这其实是不对的,因为我的地址不是区,而是一个具体的地址,{@link #acceptAdCodeResult}
	 * 2. 查询的结果可能是一个准确地址,但是高德告诉我,这个地址在别的城市或者区,{@link #acceptDiffAdCode}
	 */
	acceptDiffAdCode: "all" | "province" | "city" | "no";

	/**
	 * 记录搜索不到经纬度的数据信息,方便用户排查问题。 地址不准确有两个判断条件:
	 * 
	 * 1. 有些地址高德服务器是搜索不到的,此时它会直接返回所在的城市、省、区。通过参数`acceptAdCodeResult`,如果不接受行政区划结果,那么就是错误的结果。
	 * 2. 当源数据中有行政区划信息,但是搜索的结果所在的行政区划和源数据库中的不一致时也可能时错误的。通过参数`acceptDiffAdCode`控制,不接受的结果就是错误的结果。
	 * 
	 * 不准确的(错误的)结果不会写入到目标表(`targetTable`),但是可以写入到`errTable`表以便排查问题。
	 */
	errTable?: string;

	/**
	 * TODO 待实现,errTable的结构目前和临时表结构一致,并新增一个err_location字段
	 */
	errTableFields?: {
		/**
		 * 数据行唯一标示字段,必须存在
		 */
		id: string;
		/**
		 * 源表的行政区划字段
		 */
		adCode?: string;
		/**
		 * 搜索用的地址
		 */
		address: string;
		/**
		 * 校订后的地址
		 */
		revised_address: string;
		/**
		 * 结果地址
		 */
		err_location: string;
		/**
		 * 结果行政区划
		 */
		err_adCode?: string;
		/**
		 * 结果经度
		 */
		err_lng: string;
		/**
		 * 结果纬度
		 */
		err_lat: string;
	}
}

/**
 * 从指定的数据库表中查询出文本地址信息,并调用互联网上的经纬度坐标查询服务,获
 * 取经纬度后再写入表中的经纬度字段中。
 * 
 * 使用此服务需要配置地图Web服务Key。
 *
 * 1. 支持多线程并发访问公共地图地址编码接口,快速查询海量数据的坐标信息。
 * 2. 查询过程中将耗费大量的cpu和内存资源,建议JVM最大内存设置在4G以上。
 *
 * @param args 
 * @return 返回执行过程中的任务信息
 */
export function transformGISLocations(args: TransformGISLocationsArgs): { [propName: string]: any };

export interface TransformLocationsArgs {
	addresses: Array<{
		address1: string,
		address2?: string,
		address3?: string,
	}>;
}

/**
 * 根据传入的数组中的地址信息,查询经纬度并返回。此函数适用于小量数据的查询。
 * 
 * @param args 
 */
export function transformLocations(args: TransformLocationsArgs): Array<{ address: string, lng: number, lat: number }>;

export interface TransformLocationZonesArgs {
	/**
	 * 记录要更新地区信息的表的元数据对象或者元数据路径id
	 */
	table: string;
	/**
	 * 只更新满足条件的数据。
	 * 
	 * 可以是一个字符串,如`"FIELD1=123 AND FIELD2='abc'"`,也可以是一个json,如`{"FIELD1":123, "FIELD2":"abc"}`
	 */
	filter?: string;

	/**
	 * 从srcTable中最多查询多少行数据?
	 * 
	 * 有时候为了测试,希望只检查部分数据,所以可以传递一个最多处理的数据行数,比如1000,默认查询所有满足条件的数据
	 */
	limit?: number;

	/**
	 * 目标表的约定字段名。
	 */
	tableFields: {
		/**
		 * 数据行唯一标示字段,必须存在
		 */
		id: string;
		/**
		 * 经度字段,必须存在,且必须已经有正确的经纬度信息了
		 */
		lng: string;
		/**
		 * 维度字段,必须存在,且必须已经有正确的经纬度信息了
		 */
		lat: string;

		/**
		 * 目标表中需要计算地址所在区域的字段。如计算企业所在的自贸区、计算企业是否在长江沿岸3公里内……。
		 * 
		 * 可以指定多个字段,字段必须对应相应的地理角色、地理角色必须关联正确的地图,系统将取得地图的区块的边界进行判断,将地址所在的
		 * 区块对应的地理角色的维项ID写入到字段中。
		 */
		zones: Array<string>;
	}

}

/**
 * 根据已有的经纬度信息计算经纬度所在的地区并将计算的地区信息更新到表中。
 * 
 * 如计算企业所在的自贸区、计算企业是否在长江沿岸3公里内……。
 */
export function transformLocationZones(args: TransformLocationZonesArgs): void;


/**
 * 数据迁移。
 * 
 * 是一个支持从多种源迁移数据到多种目的地的服务。
 * 
 * 提供的能力:
 * 
 * 1. 可以处理不同类型的数据源,例如数据库、文件等。
 * 2. 支持多个表并行数据迁移。
 * 
 * 
 * 
 * @param args 
 */
export function copyTables(args: MigrateDataArgs): MigrateDataResult;

/**
 * 立即执行一个数据加工提取
 *
 * 如果数据加工提取出错,会抛出异常,如果不期望抛出异常,请使用try..catch
 *
 * @param resIdOrPath 资源ID或路径 
 * @param params 加工定义的参数
 * @returns 返回提取日志信息列表
 */
export function runETL(resIdOrPath: string, params?: JSONObject): Array<LogItemInfo>

/**
 * 立即执行计划调度,异步执行。
 *
 * 参数`args.taskRunArgs`可以设置任务运行时传递给任务的参数。如`{"param1":123, "param2"="测试"}`,调
 * 度时会自动传递给所有的任务,计划和任务本身也支持参数设置,他们的参数设置优先级低于这里传递的值,
 * 一个任务对象如加工在具体执行时,其内部的参数取值优先级以此如下:
 *
 * 1. `args.taskRunArgs`传递的参数值
 * 2. 计划任务上设置的参数
 * 3. 计划上设置的参数
 * 4. 加工内部参数定义上的默认值
 * 
 * 此函数默认不等待计划执行完成,传递`args.waitFinish`为true可以让此函数等待计划执行完成。
 *
 * @param id 计划ID
 * @param args 一些附加参数,见上文描述
 */
export function runSchedule(id: string, args?: { taskRunArgs: JSONObject, waitFinish?: boolean }): void;

/**
 * 初始化系统日期维的数据服务。
 * 用于系统日期维的初始化数据的脚本调用。
 * 
 * @param start 日期起,例:20000101
 * @param end 日期止,例:20301231
 * @param defineFestivals 字符串数组,自定义节假日,通过客户端脚本传递。
 *        数组格式:["20210926 班","20211001*国庆"] 带*表示假日,不带表示上班调休。 
 */
export function initSysDateDim(start: string, end: string, defineFestivals: string[]): void;

/**
 * 导入数据文件为物理表。
 * 
 * 支持导入多个csv/csz文件,也支持导入zip文件。
 * 
 * @deprecated 请使用 {@link copyTables}
 * @param args 
 */
export function importData(args: ImportDataFileToDBArgs): MigrateDataResult;

# 对象

/**
 * 此文件定义一些数据加工相关的数据类型。
 */


/**
 * 传入给脚本加工节点的参数。通过此参数可以获取和设置加工过程中的所有上下文信息。
 */
declare interface IDataFlowScriptNodeContext {
	/**
	 * 是否为预览数据
	 */
	isPreview: boolean;

	/**
	 * 加工定义的全局参数
	 */
	params?: JSONObject;

	/**
	 * 当前查询起始位置。
	 * 
	 * {@link isPreview} 时生效。
	 */
	offset?: number;

	/**
	 * 当前查询组大行数。
	 * 
	 * {@link isPreview} 时生效。
	 */
	limit?: number;

	/**
	 * 获取脚本输入节点的个数
	 */
	getInputCount(): number;

	/**
	 * 获取脚本输入节点的信息
	 * @param index 不传返回第一个
	 * @returns 不传index返回第一个,不存在的索引,返回null
	 */
	getInput(index?: number): FlowScriptNodeInputData;

	/**
	 * 获取脚本输出节点的信息,可通过这个对象设置脚本节点输出信息
	 */
	getOutput(): FlowScriptNodeOutputData;

	/**
	 * 获取当前节点的属性信息。
	 * 
	 * @return 返回的JSON对象是只读的,修改内部属性不会生效。
	 */
	getNodeInfo(): DataFlowNodeInfo;

}

/**
 * 拓展节点后端脚本上下文
 */
declare interface IDataFlowExtendNodeContext extends IDataFlowScriptNodeContext {

	/**
	 * 拓展节点的自定义属性
	 */
	properties?: JSONObject;
}

/**
 * 脚本输入节点的信息
 */
declare interface FlowScriptNodeInputData {

	/**
	 * 获取节点属性信息。
	 */
	getNodeInfo(): DataFlowNodeInfo;

	/**
	 * 返回的字段数量
	 */
	getFieldCount(): number;

	/**
	 * 获取指定列的字段属性
	 * 
	 * @param index 0是第一列
	 */
	getField(index: number): DwTableFieldInfo;

	/**
	 * 获取上游节点的表或sql。
	 * 
	 * @return 可空,如:上游是关联节点、流式节点,则返回Null。
	 * 
	 * @see FlowScriptNodeOutputData#setDbTableInfo
	 */
	getDbTableInfo(): DbDataSourceInfo;

	/**
	 * 返回上游节点的输出数据源。
	 * 
	 * 若上游是物理表、SQL节点,则等价于`getDbTableInfo().dataSource`。
	 * 若上游是文件节点,则会直接将文件流作为脚本的输入流,不涉及数据源,返回Null。
	 * 
	 * @return 可空。
	 */
	getDataSouce(): string;

	/**
	 * 获取下一行数据。
	 * 
	 * 与 {@link #getObject()} 配合使用。
	 * 
	 * ```js
	 * let fieldCount = inputNode.getFieldCount();
	 * while (inputNode.next()) {
	 * 		let row = [];
	 * 		for (let i = 0; i < fieldCount; i++) {
	 * 			row[i] = inputNode.getObject(i);
	 * 		}
	 * 		// do something with row
	 * }
	 * ```
	 * 
	 * @reutrns 返回true表示存在下一行,false表示没有更多数据了。
	 */
	next(): boolean;

	/**
	 * 读取当前行指定列的数据。
	 * 
	 * @param index 0是第一列
	 */
	getObject(index: number): any;

	/**
	 * 获取全部字段信息。
	 * 
	 * @deprecated 将在新版本删除
	 */
	getFields(): Array<DwTableFieldInfo>

	/**
	 * 返回是否有下一行数据,调用后会开启流式查询。
	 * 
	 * @deprecated 将在新版本删除
	 */
	hasNext(): boolean;

	/**
	 * 获取当前遍历到一行数据
	 *
	 * 请在{@link #hasNext()}返回true之后调用,如果不调用或者返回false,脚本执行过程中会抛出异常,正确的调用示例参考:
	 * ```
	 * 	while(inputNode.hashNext()){
	 * 		let row inputNode.nextRow();
	 * 		// do something with row!
	 * }
	 * ```
	 * 
	 * @deprecated 将在新版本删除
	 */
	nextRow(): Array<any>;
}

/**
 * 脚本输出节点信息,可通过该对象设置加工过程中的所有上下文信息
 */
declare interface FlowScriptNodeOutputData {

	/**
	 * 设置节点的字段信息。
	 * 
	 * 通过api读取JSON时字段和数据是一起产生的,支持脚本执行完同时产生字段和数据。
	 * 
	 * 1. 设计器调用,加工节点的字段总是会按照设置的字段信息初始化。
	 * 2. 提取时调用,设置的信息将用于和节点字段映射,确定每行数据对应的字段顺序。
	 * 
	 * 对于流式数据,必须通过这个方法或{@link DataTransformExtensionScriptAction.onProcessFields}来设置字段,
	 * 未设置会抛出缺少字段异常,两个方法同时设置会抛出重复设置异常。
	 * 
	 * 输出表或sql时可以不设置字段,会自动读取对应的表结构作为字段信息。
	 * 
	 * @param fields 
	 * 
	 * @see DataTransformExtensionScriptAction.onProcessFields
	 */
	setFields(fields: DwTableFieldInfo[]): void;

	/**
	 * 设置节点输出的表或sql。
	 * 
	 * 拓展组件产生的结果可能是个表或sql,设置后会将这个表或sql的查询结果作为节点的数据,
	 * 后续的加工和提取也会基于这个表或sql来构造,可以当成一个物理表节点使用。
	 * 
	 * 预览节点数据时,切换页码或按列头排序会在表或sql的基础上构造查询,因此设置的sql不需要带有limit等信息,
	 * 当然如果业务本身就是对limit后的数据加工,还是可以带上。
	 * 
	 * 如果希望将输出表直接作为模型输出使用,可以在拓展节点后添加模型输出,选择使用上游节点表作为输出即可。
	 * 
	 * 设置后再执行 {@link #writeRow()} 将不起作用。
	 * 
	 * @param dbTable 
	 */
	setDbTableInfo(dbTable: DbDataSourceInfo): void;

	/**
	 * 输出一条数据给后序节点。
	 * 
	 * 数据量很大时,可能会阻塞设计器界面的数据预览,开发者可以判断{@link IDataFlowScriptNodeContext.isPreview}
	 * 输出少量数据,提高数据预览效率。
	 *
	 * @param row 
	 */
	writeRow(row: Array<any>): void;
}
是否有帮助?
0条评论
评论