# 数据加工脚本组件

SuccBI数据加工提供了常见的加工组件,通过简单的设置就可以使用,但是实际数据加工中可能存在一些无法满足的个性化加工需求,比如:

  1. 调用外部程序,如python从互联网爬取数据、Shell命令等。
  2. 调用互联网上的一些API服务来加工数据,比如将企业文本地址转换为经纬度。
  3. 读取一个特殊格式的数据文件进行加工。
  4. 执行一个复杂的业务逻辑(数据库存储过程无法实现)并将结果输出。
  5. 遍历加工流程中的上游节点的输出数据并进行处理后输出给下游加工节点。

为此SuccBI提供了“脚本”加工组件,帮助连接任意位置的任意格式的数据进行加工分析:

# 脚本加工

“脚本”加工节点可以让数据加工过程中拥有“编程”能力,完成任意个性化需求,脚本加工节点提供了如下能力:

  1. 调用外部程序的能力。
  2. 执行SQL的能力。
  3. 脚本编程能力(详见脚本开发须知)。
  4. 访问加工流程的上下文信息的能力。比如获取上游节点输出的数据、获取上游节点产生的SQL、输出数据给下游节点……

# API

# 与脚本相关的数据接口

/**
 * 此文件定义一些数据加工相关的数据类型。
 */


/**
 * 传入给脚本加工节点的参数。通过此参数可以获取和设置加工过程中的所有上下文信息。
 */
declare interface IDataFlowScriptNodeContext {
	/**
	 * 是否为预览数据
	 */
	isPreview: boolean;

	/**
	 * 加工定义的全局参数
	 */
	params?: JSONObject;

	/**
	 * 当前查询起始位置。
	 * 
	 * {@link isPreview} 时生效。
	 */
	offset?: number;

	/**
	 * 当前查询组大行数。
	 * 
	 * {@link isPreview} 时生效。
	 */
	limit?: number;

	/**
	 * 获取脚本输入节点的个数
	 */
	getInputCount(): number;

	/**
	 * 获取脚本输入节点的信息
	 * @param index 不传返回第一个
	 * @returns 不传index返回第一个,不存在的索引,返回null
	 */
	getInput(index?: number): FlowScriptNodeInputData;

	/**
	 * 获取脚本输出节点的信息,可通过这个对象设置脚本节点输出信息
	 */
	getOutput(): FlowScriptNodeOutputData;

	/**
	 * 获取当前节点的属性信息。
	 * 
	 * @return 返回的JSON对象是只读的,修改内部属性不会生效。
	 */
	getNodeInfo(): DataFlowNodeInfo;

}

/**
 * 拓展节点后端脚本上下文
 */
declare interface IDataFlowExtendNodeContext extends IDataFlowScriptNodeContext {

	/**
	 * 拓展节点的自定义属性
	 */
	properties?: JSONObject;
}

/**
 * 脚本输入节点的信息
 */
declare interface FlowScriptNodeInputData {

	/**
	 * 获取节点属性信息。
	 */
	getNodeInfo(): DataFlowNodeInfo;

	/**
	 * 返回的字段数量
	 */
	getFieldCount(): number;

	/**
	 * 获取指定列的字段属性
	 * 
	 * @param index 0是第一列
	 */
	getField(index: number): DwTableFieldInfo;

	/**
	 * 获取上游节点的表或sql。
	 * 
	 * @return 可空,如:上游是关联节点、流式节点,则返回Null。
	 * 
	 * @see FlowScriptNodeOutputData#setDbTableInfo
	 */
	getDbTableInfo(): DbDataSourceInfo;

	/**
	 * 返回上游节点的输出数据源。
	 * 
	 * 若上游是物理表、SQL节点,则等价于`getDbTableInfo().dataSource`。
	 * 若上游是文件节点,则会直接将文件流作为脚本的输入流,不涉及数据源,返回Null。
	 * 
	 * @return 可空。
	 */
	getDataSouce(): string;

	/**
	 * 获取下一行数据。
	 * 
	 * 与 {@link #getObject()} 配合使用。
	 * 
	 * ```js
	 * let fieldCount = inputNode.getFieldCount();
	 * while (inputNode.next()) {
	 * 		let row = [];
	 * 		for (let i = 0; i < fieldCount; i++) {
	 * 			row[i] = inputNode.getObject(i);
	 * 		}
	 * 		// do something with row
	 * }
	 * ```
	 * 
	 * @reutrns 返回true表示存在下一行,false表示没有更多数据了。
	 */
	next(): boolean;

	/**
	 * 读取当前行指定列的数据。
	 * 
	 * @param index 0是第一列
	 */
	getObject(index: number): any;

	/**
	 * 获取全部字段信息。
	 * 
	 * @deprecated 将在新版本删除
	 */
	getFields(): Array<DwTableFieldInfo>

	/**
	 * 返回是否有下一行数据,调用后会开启流式查询。
	 * 
	 * @deprecated 将在新版本删除
	 */
	hasNext(): boolean;

	/**
	 * 获取当前遍历到一行数据
	 *
	 * 请在{@link #hasNext()}返回true之后调用,如果不调用或者返回false,脚本执行过程中会抛出异常,正确的调用示例参考:
	 * ```
	 * 	while(inputNode.hashNext()){
	 * 		let row inputNode.nextRow();
	 * 		// do something with row!
	 * }
	 * ```
	 * 
	 * @deprecated 将在新版本删除
	 */
	nextRow(): Array<any>;
}

/**
 * 脚本输出节点信息,可通过该对象设置加工过程中的所有上下文信息
 */
declare interface FlowScriptNodeOutputData {

	/**
	 * 设置节点的字段信息。
	 * 
	 * 通过api读取JSON时字段和数据是一起产生的,支持脚本执行完同时产生字段和数据。
	 * 
	 * 1. 设计器调用,加工节点的字段总是会按照设置的字段信息初始化。
	 * 2. 提取时调用,设置的信息将用于和节点字段映射,确定每行数据对应的字段顺序。
	 * 
	 * 对于流式数据,必须通过这个方法或{@link DataTransformExtensionScriptAction.onProcessFields}来设置字段,
	 * 未设置会抛出缺少字段异常,两个方法同时设置会抛出重复设置异常。
	 * 
	 * 输出表或sql时可以不设置字段,会自动读取对应的表结构作为字段信息。
	 * 
	 * @param fields 
	 * 
	 * @see DataTransformExtensionScriptAction.onProcessFields
	 */
	setFields(fields: DwTableFieldInfo[]): void;

	/**
	 * 设置节点输出的表或sql。
	 * 
	 * 拓展组件产生的结果可能是个表或sql,设置后会将这个表或sql的查询结果作为节点的数据,
	 * 后续的加工和提取也会基于这个表或sql来构造,可以当成一个物理表节点使用。
	 * 
	 * 预览节点数据时,切换页码或按列头排序会在表或sql的基础上构造查询,因此设置的sql不需要带有limit等信息,
	 * 当然如果业务本身就是对limit后的数据加工,还是可以带上。
	 * 
	 * 如果希望将输出表直接作为模型输出使用,可以在拓展节点后添加模型输出,选择使用上游节点表作为输出即可。
	 * 
	 * 设置后再执行 {@link #writeRow()} 将不起作用。
	 * 
	 * @param dbTable 
	 */
	setDbTableInfo(dbTable: DbDataSourceInfo): void;

	/**
	 * 输出一条数据给后序节点。
	 * 
	 * 数据量很大时,可能会阻塞设计器界面的数据预览,开发者可以判断{@link IDataFlowScriptNodeContext.isPreview}
	 * 输出少量数据,提高数据预览效率。
	 *
	 * @param row 
	 */
	writeRow(row: Array<any>): void;
}

# 脚本模板

import etl from "svr-api/etl";

/**
 * 返回脚本节点字段结构。
 * 使用场景:
 * 1. 通过脚本爬取数据到数据仓库,需要生成定义好的字段结构。
 * 2. 通过脚本解析json数据,需要预解析几行数据生成字段。
 *
 * @returns 
 *  字段列表: 生成的字段列表应该是稳定的,应当尽量避免不同调用场景返回不同字段列表的情况
 */
function onProcessFields(context: IDataFlowScriptNodeContext): DwTableFieldInfo[] {
	return [];
}

/**
 * 返回脚本节点的数据结构。
 * 
 * 使用场景:
 * 1. 调用api处理gis数据更新,并需要将处理后的物理表输出为模型。
 * 2. 调用api查询数据,读取仪表板或报表的查询条件作为参数,并将实时查询结果返回。
 *
 * @returns
 * 1.返回null 或者没有返回值,则视为已经通过 context.getOutput() 写出了输出
 * 2.可以直接返回DbTableInfo类型的数据,ddl存储查询的sql,table存储的是对应的物理表名
 * 3.一个二维数组:内存数据对象
 */
function onProcessData(context: IDataFlowScriptNodeContext): DbDataSourceInfo | any[][] | null | void {
	return null;
}

# 脚本示例

# 使用高德API查询企业经纬度信息

import etl from "svr-api/etl";

/**
 * 利用高德坐标查询服务,批量转换千万级企业地址数据。
 */
// function onProcessScriptNode(context: IDataFlowScriptNodeContext): void {
// 	if (context.isPreview) {//用户在加工界面预览脚本节点的数据
// 		//在此处做轻量的查询并显示前50个企业的经纬度就好
// 		let data = context.getInputDataAsArray();

// 		let addresses = data.map(row => {
// 			return {
// 				address1: row["address1"],
// 				address2: row["address2"],
// 				address3: row["address3"],
// 			}
// 		});
// 		let locations = etl.transformLocations({
// 			addresses: addresses
// 		});

// 		data.forEach((row, i) => {
// 			row["lng"] = locations[i].lng;
// 			row["lat"] = locations[i].lat;
// 		});

// 		context.setOutputDataArray(data);
// 		return;
// 	}

// 	//TODO 考虑增量的情况 @fengyg
// 	/**将上游节点的数据输出到一个物理表中并返回物理表,如果上游节点就是一个物理表,那么直接返回 */
// 	let dbTable = context.getInputDataAsDbTable();
// 	// 调用接口进行gis经纬度更新
// 	etl.transformDbTableLocations({
// 		geoserver: 'amap',
// 		threadsCount: 16,
// 		default_province_prefix: '安徽省',
// 		geotable: dbTable,  // 中间表名,可以是物理表或模型表
// 		geotable_filter: "",
// 		startDate: '',
// 		endDate: '',
// 		id: 'id_',  //唯一编码
// 		district: 'DQ_DM',   //地区编码
// 		address: 'dwdz', //地址的字段
// 		address2: 'SYDW',  //单位的名称
// 		address3: '',
// 		adddetaillog: false,
// 		confidence: 1,
// 		lng: 'jd',
// 		lat: 'wd'
// 	});

// 	// 设置输出表
// 	context.setOutputDataAsDbtable(dbTable);
// }

# 爬取企查查数据作为测试数据

import * as http from "svr-api/http";

/**
 * 返回脚本节点字段结构。
 */
function onProcessFields(context: IDataFlowScriptNodeContext): DwTableFieldInfo[] {
	return [
		{ name: "tableType" },
		{ name: "id" },
		{ name: "labels" },
		{ name: "keyNo" },
		{ name: "registCapi" },
		{ name: "name" },
		{ name: "econKind" },
		{ name: "hasImage" },
		{ name: "status" },
		{ name: "role" },
		{ name: "type" },
		{ name: "startNode" },
		{ name: "endNode" },
		{ name: "stockPercent", dataType: FieldDataType.N, scale: 2 },
		{ name: "shouldCapi", dataType: FieldDataType.N, scale: 2 },
	];
}

/**
 * 爬取企查查数据作为测试数据
 */
function onProcessData(context: IDataFlowScriptNodeContext): DbTableMeta | any[][] {
	// 登录的企查查id
	let qccSessionID = "your-session-id";
	// 要查询的企业id
	let keyNos = ["your-key"];
	// 查询的层数
	let deep = 1;
	let urlPrefix = "https://www.qichacha.com/company_muhouAction?keyNo=";

	let outputData = context.getOutput();
	// 设置字段信息,添加tableType字段,用于区分该数据是属于关系表还是公司信息表
	let fieldsInfo: DwTableFieldInfo[] = onProcessFields(context);

	// 已经查询过的企业ID
	let processedKeyNo: { [keyNo: string]: boolean } = {};
	let getRelationNodes = (keyNos: string[]): string[] => {
		let keys: string[] = [];
		keyNos.forEach(key => {
			// 已经加载过的企业ID不必再进行加载
			if (processedKeyNo[key] != true) {
				// 根据url获取数据信息,并将其写入到输出流中
				let graph = getJsonData(urlPrefix + key, qccSessionID);
				keys.pushAll(writeData(graph, outputData, fieldsInfo));
				processedKeyNo[key] = true;
			}
		})
		return keys;
	}
	for (let i = 0; i < deep; i++) {
		// 根据企业Id,获取与其相关联的企业ID,并将关联企业信息与关联关系写入到输出流中
		keyNos = getRelationNodes(keyNos);
	}
	return null;
}

/**
 * 处理从网页上爬取的数据信息,并将其写入到输出节点
 * @param datas 
 * @param output 
 * @param tableType 
 * @param fieldsInfo 
 */
function writeData(graph: JSONObject, output: FlowScriptNodeOutputData, fieldsInfo: DwTableFieldInfo[]): string[] {
	if (graph == null) {
		return [];
	}
	let keyNos: string[] = [];
	let nodes: any[] = graph.nodes || [];
	let relationships: any[] = graph.relationships || [];
	let writeTableData = (datas: any[], tableType: "nodes" | "relationships") => {
		if (!datas?.length) {
			return;
		}
		let fieldCount = fieldsInfo.length;
		let outRow: any[] = new Array(fieldsInfo.length);
		datas.forEach(data => {
			outRow = [tableType];
			let keyNo = data["properties"]["keyNo"];
			if (keyNo) {
				keyNos.push(keyNo)
			}
			for (let i = 1; i < fieldCount; i++) {
				let fieldName = fieldsInfo[i].name;
				let cellData = data[fieldName] || data["properties"][fieldName];
				// 处理数据
				if (cellData instanceof Array) {
					cellData = cellData.join(";");
				} else if (typeof cellData === 'boolean') {
					cellData = cellData ? 1 : 0;
				}
				outRow.push(cellData)
			}
			output.writeRow(outRow);
		});
	}
	writeTableData(nodes, "nodes");
	writeTableData(relationships, "relationships");
	return keyNos;
}

/**
 * 根据url从网页上获取企业和企业关系信息
 * @param url 
 */
function getJsonData(url: string, qccSessionID: string): JSONObject {
	let responseResult = http.request({
		url: url,
		headers: {
			'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3',
			'Accept-Encoding': 'gzip, deflate, br',
			'Accept-Language': 'zh-CN,zh;q=0.9',
			'Cache-Control': 'private',
			'Connection': 'keep-alive',
			'Cookie': 'QCCSESSID=' + qccSessionID + '; acw_tc=6f30a7b215773419007093357e22bfe013a443770ce031ba800d6ee3cb; zg_did=%7B%22did%22%3A%20%2216f40e75afb284-0638f1c7f281-5373e62-144000-16f40e75afd260%22%7D; UM_distinctid=16f40e75b48550-075aec187e7747-5373e62-144000-16f40e75b492f; Hm_lvt_3456bee468c83cc63fb5147f119f1075=1577341902; _uab_collina=157734190205655381960585; hasShow=1; CNZZDATA1254842228=1613897942-1577341582-%7C1577339172; Hm_lpvt_3456bee468c83cc63fb5147f119f1075=1577343695; zg_de1d1a35bfa24ce29bbf2c7eb17e6c4f=%7B%22sid%22%3A%201577341901573%2C%22updated%22%3A%201577343699124%2C%22info%22%3A%201577341901581%2C%22superProperty%22%3A%20%22%7B%7D%22%2C%22platform%22%3A%20%22%7B%7D%22%2C%22utm%22%3A%20%22%7B%7D%22%2C%22referrerDomain%22%3A%20%22%22%2C%22cuid%22%3A%20%22ed5b05fb227ff484e741a2c5220e5e96%22%7D',
			'Host': 'www.qichacha.com',
			'Referer': 'http://www.qichacha.com/',
			'If-Modified-Since': 'Wed, 24 Oct 2018 12:35:27 GMT',
			'If-None-Match': '"59*******"',
			'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.132 Safari/537.36',
		},
		method: "GET"
	});

	try {
		let jsonData = JSON.parse(responseResult.responseText);
		let success = jsonData["success"];
		let result = success && success["results"] && success["results"][0];
		let data = result && result.data && result.data[0];
		let graph = data && data.graph;
		return graph;
	}
	catch (e) {
		throw new Error(`接口返回的数据无法解析为JSON:${responseResult.responseText}`);
	}
}

# 备份物理表

import etl from "svr-api/etl";
import db from "svr-api/db";

/**
 * 返回脚本节点字段结构。
 */
function onProcessFields(context: IDataFlowScriptNodeContext): DwTableFieldInfo[] {
	// 备份过程不需要输出,这里保留一个字段用于输出日志
	return [{ name: "脚本执行状态", dataType: FieldDataType.M }];
}

/**
 * 备份物理表脚本。
 * 每次执行备份昨天的最新数据,drop掉7天前的表
 */
function onProcessData(context: IDataFlowScriptNodeContext): DbTableMeta | any[][] {
	//在这里输入脚本节点的业务逻辑代码
	let tableName = "F_JJHK_TOPICIS";
	let output = context.getOutput();
	if (context.isPreview) {
		output.writeRow(["预览执行"]);
		return null;  // 需要预览时执行就把return注释掉
	}
	let curDate = new Date();
	curDate.setDate(curDate.getDate() - 1);
	let curTable = tableName + "_" + curDate.getMonth() + "_" + curDate.getDate();
	curDate.setDate(curDate.getDate() - 6);
	let table7 = tableName + "_" + curDate.getMonth() + "_" + curDate.getDate();

	let ds = db.getDataSource("Vertica");

	output.writeRow([ds.getDefaultSchema()]); // 调试使用
	if (ds.isTableExists(curTable)) {
		ds.executeUpdate("drop table " + curTable + " CASCADE");
	}
	output.writeRow([curTable]);  // 调试使用

	let sql = "create table " + curTable + " as (select * from " + tableName + ")";
	output.writeRow([sql]);  // 调试使用
	ds.executeUpdate(sql);
	if (ds.isTableExists(table7)) {
		ds.executeUpdate("drop table " + table7 + " CASCADE");
	}
	output.writeRow(["正式执行成功"]);
	return;
}

# 获取api接口的json数据,并解析成结构化数据

import etl from "svr-api/etl";
import { get } from "svr-api/http";

// 定义输出字段
let outputFields = [
	{ name: "LIC_YCER_NAME_CODE", dataType: FieldDataType.C },
	{ name: "LIC_VALID_TIME_END", dataType: FieldDataType.C },
	{ name: "LIC_VALID_TIME_BEGIN", dataType: FieldDataType.C },
	{ name: "LIC_YCER_NAME", dataType: FieldDataType.C },
	{ name: "LIC_NUMBER", dataType: FieldDataType.C },
	{ name: "LIC_HOLDER_CODE", dataType: FieldDataType.C },
	{ name: "ROW_ID", dataType: FieldDataType.C }
]

function onProcessFields(context: IDataFlowScriptNodeContext): DwTableFieldInfo[] {
	return outputFields;
}

// 从json结果的page.list中读取数据
function onProcessData(context: IDataFlowScriptNodeContext): DbTableMeta | any[][] {
	let result = get("api.succsoft.cn/json").responseText;
	let json = JSON.parse(result);
	let list = json.page.list as Array<any>;
	let data = [];
	for (let i = 0, len = list.length; i < len; i++) {
		let row = [];
		for (let j = 0, len1 = outputFields.length; j < len1; j++) {
			row.push(list[i][outputFields[j].name]);
		}
		data.push(row);
	}
	return data;
}

# 读取MongoDB的数据,并解析成结构化数据

通过jdbc连接MongoDB数据库需要org.mongodb:mongodb-driver-sync:4.8.1的jar包,可以通过maven下载对应的jar包后,添加至tomcat目录下的lib文件夹后启动web服务器,并正确配置用户名、密码、IP、端口等信息,本脚本即可正常运行。

import etl from "svr-api/etl";
/** mongodb:[username]:[password]@[ip]:[port] */
const uri = "mongodb://yourusername:yourpassword@192.0.0.1:27017/?authSource=succbi&authMechanism=SCRAM-SHA-256"
const database = "yourdatabase"

/** 样例json字符串 */
const example = '{"_id":"63aabbd8f33226ac32517f2f","_uid":"20865c4edc46474dac5756c296b037c9","dataStatus":"0","fixedAssetClassificationInformation":[{"assetsType":"01","assetsTypeName":"土地、房屋及构筑物","fixedAssetClassificationStandard":"01","fixedAssetClassificationCode":"1010800","fixedAssetClassificationName":"行政单位用地"}],"fixedAssetIDInformation":[{"IDType":"01","IDCode":"111","IDCodeImg":""}],"fixedAssetBusinessInformation":[{"businessName":"","businessCode":"","businessExecutionDate":"","businessExecutor":"","businessType":"","approvalCase":"","businessDescription":""},{"businessName":"房地产处置立项","businessCode":"2","businessExecutionDate":"2022-12-28","businessExecutor":"","businessType":"01","approvalCase":"","businessDescription":""},{"businessName":"资产报废处置","businessCode":"6","businessExecutionDate":"2022-12-29","businessExecutor":"","businessType":"08","approvalCase":"","businessDescription":""},{"businessName":"资产变更","businessCode":"7","businessExecutionDate":"2022-12-29","businessExecutor":"","businessType":"","approvalCase":"","businessDescription":""}],"fixedAssetEntityInformation":{"fixedAssetName":"101","direction":"出借","useState":"01","depreciationState":"01","depreciationMethod":"01","depreciationYearsStatus":"0","depreciationYearsLimit":"","accDepreciation":"","depreciationMonth":"","monthlyDepreciation":"","cardStatus":"","residualValue":"","residualValueRate":"","remarks":""},"fixedAssetManufactureInformation":{"manufacturerName":"","brand":"","specification":"","productId":""},"valueInformation":{"unitPrice":"1","currencyName":"人民币(元)","measureUnit":"立方米","amount":"1","totalPrice":"1"},"fixedAssetPurchaseInformation":{"supplierName":"","contractNumber":"","accountingDocumentNumber":"","invoiceNumber":"","acquirementWay":"","acquirementDate":"","warrantyDeadlineDate":"","estimatedUsefulLife":"","financialAccountingStatus":"","procurementWay":"","budgetItemNumber":"","valueType":"","netBookValue":"","value":"","financialFund":"","nonFinancialFunds":"","financialEntryDate":"","dateOfUse":""},"fixedAssetProprietorInformation":{"userName":"","agencyName":"二级单位101","unitType":"01","unitLevelType":"01","unitNature":"11","departmentName":"","placementLocation":"","organizationCode":"","agencyAddress":"101","zipCode":"101","userDepartment":"","userContactTelephoneNumber":"","userEmail":"","unitUnifiedSocialCreditCode":"101","deviceUsage":"","departmentIdentification":"","financialBudgetCode":"101"},"carInformation":{"plateNo":"","carPl":"","formation":"","carCert":"","certOwner":"","registerDate":"","engineCode":"","carIDCode":"","growPlace":"","repairExpiryDay":"","contractNumber":"","vehicularApplications":""},"houseInformation":{"warmArea":"","dangerArea":"","buildStruct":"","houseRightOwner":"四川机关事务管理局","completeDate":"","designUsage":"","measurementOfStructures":""},"propertyRight":{"propertyRight":"","rightProve":"中华人民共和国房屋所有权证","rightNature":"","rightCertNo":"","rightYear":"","certOwnerName":"","certDate":"","useRightArea":""},"landInformation":{"useRightArea":"","privateArea":"","shareArea":"","useRightType":"","landType":"","landLevel":""},"theMeasureOfArea":{"idleArea":"","selfUseArea":"","lendingArea":"","rentalArea":"","otherAreas":""},"archivalInformation":{"press":"","publicationDate":"","fileNumber":"","shelfLife":""},"specialPurposeInformation":{"ageOfPlanting":"","yearOfPlanting":"","classGenusFamily":"","placeOfOrigin":""},"culturalRelicsInformation":{"source":"","ageOfCollection":"","gradeOfHeritage":""},"intangibleAssets":{"registrationAndRegistrationAuthority":"","approvalNumber":"","patentNumber":"","inventor":"","titleOfInvention":"","theDateOfAuthorizationProclamation":"","certificateNumber":"","patentApplicationNo":"","copyrightCertificateNO":"","copyrightOwner":"","authorizationCertificate":"","authorizationCertificateNo":"","theDeveloper":""}}';

/**
 * 定义字段结构。
 * @param context 
 * @returns 
 */
function onProcessFields(context: IDataFlowScriptNodeContext): DwTableFieldInfo[] {
	return [
		{ name: "资产编号", dataType: FieldDataType.C, length: 50 },
		{ name: "资产分类代码", dataType: FieldDataType.C, length: 20 },
		{ name: "资产分类", dataType: FieldDataType.C, length: 50 },
		{ name: "资产国标大类", dataType: FieldDataType.C, length: 50 },
		{ name: "资产名称", dataType: FieldDataType.C, length: 50 },
	];
}

/**
 * 输出和字段结构相对应的数据。
 * @param context 
 */
function onProcessData(context: IDataFlowScriptNodeContext): void {
	let MongoClients = Java.type("com.mongodb.client.MongoClients");
	let Filters = Java.type("com.mongodb.client.model.Filters");
	let Document = Java.type("org.bson.Document");
	let Sorts = Java.type("com.mongodb.client.model.Sorts");

	let output = context.getOutput();

	let mongoClient;
	try {
		mongoClient = MongoClients.create(uri);
		let db = mongoClient.getDatabase(database);

		let collection = db.getCollection("cars");
		let iterator = collection.find().sort(Sorts.ascending("_id")).limit(100).skip(0).iterator();

		try {
			while (iterator.hasNext()) {
				let result = iterator.next();
				let jsonStr = result.toJson();
				let json = JSON.parse(jsonStr); // 数据格式参考example对象
				let fixedAssetClassificationInformation = json['fixedAssetClassificationInformation'];// 固定资产购置信息
				let fixedAssetEntityInformation = json['fixedAssetEntityInformation'];// 固定资产实体信息
				// 字段顺序和onProcessFields设置的字段顺序一致
				let row: any[] = [];
				row.push(json['_id']);
				row.push(fixedAssetClassificationInformation[0]["fixedAssetClassificationCode"]);// 固定资产分类代码
				row.push(fixedAssetClassificationInformation[0]["fixedAssetClassificationName"]);// 固定资产分类名称
				row.push(fixedAssetClassificationInformation[0]["fixedAssetClassificationStandard"]);// 固定资产分类标准
				row.push(fixedAssetEntityInformation["fixedAssetName"]);// 固定资产名称
				output.writeRow(row); // 输出数据
			}
		}
		finally {
			iterator.close();
		}

	}
	finally {
		mongoClient && mongoClient.close();
	}
}
是否有帮助?
0条评论
评论