# 数据加工脚本组件

SuccBI数据加工提供了常见的加工组件,通过简单的设置就可以使用,但是实际数据加工中可能存在一些无法满足的个性化加工需求,比如:

  1. 调用外部程序,如python从互联网爬取数据、Shell命令等。
  2. 调用互联网上的一些API服务来加工数据,比如将企业文本地址转换为经纬度。
  3. 读取一个特殊格式的数据文件进行加工。
  4. 执行一个复杂的业务逻辑(数据库存储过程无法实现)并将结果输出。
  5. 遍历加工流程中的上游节点的输出数据并进行处理后输出给下游加工节点。

为此SuccBI提供了“脚本”加工组件,帮助连接任意位置的任意格式的数据进行加工分析:

# 脚本加工

“脚本”加工节点可以让数据加工过程中拥有“编程”能力,完成任意个性化需求,脚本加工节点提供了如下能力:

  1. 调用外部程序的能力。
  2. 执行SQL的能力。
  3. 脚本编程能力(详见脚本开发须知)。
  4. 访问加工流程的上下文信息的能力。比如获取上游节点输出的数据、获取上游节点产生的SQL、输出数据给下游节点……

# API

# 与脚本相关的数据接口

/**
 * 此文件定义一些数据加工相关的数据类型。
 */

declare type IDataFlowDbTableName = string | { datasourceName?: string, schema?: string, name: string };

declare type IDataFlowDataSetArray = Array<IDataFlowDataRow>;

declare interface IDataFlowDataRow {
	[fieldName: string]: any;
}

/**
 * 传入给脚本加工节点的参数。通过此参数可以获取和设置加工过程中的所有上下文信息。
 */
declare interface IDataFlowScriptNodeContext {
	/**
	 * 是否为预览数据
	 */
	isPreview: boolean;

	/**
	 * 从前端传进来的原始查询query信息
	 */
	queryInfo: QueryInfo
	
	/**
	 * 获取脚本输入节点的个数
	 */
	getInputCount(): number;
	/**
	 * 获取脚本输入节点的信息
	 * @param index 
	 */
	getInput(index?: number): FlowScriptNodeInputData;

	/**
	 * 获取脚本输出节点的信息,可通过这个对象设置脚本节点输出信息
	 */
	getOutput(): FlowScriptNodeOutputData;
	/**
	 * 获取脚本节点的属性信息
	 */
	getNodeInfo(): DataFlowNodeInfo;
}

/**
 * 脚本输入节点的信息
 */
declare interface FlowScriptNodeInputData {

	/**
	 * 获取节点属性信息。
	 */
	getNodeInfo(): DataFlowNodeInfo;

	/**
	 * 获取输入字段结构。
	 */
	getFields(): Array<DbFieldInfo>;

	/**
	 * 获取前序节点查询sql,若是流式节点,则会返回落地到临时表后的查询sql
	 */
	getSql(): string;

	/**
	 * 返回是否有下一行数据,调用后会开启流式查询。
	 */
	hasNext(): boolean;

	/**
	 * 获取下一行数据
	 */
	nextRow(): Array<any>;

	/**
	 * 返回节点数据源
	 */
	getDataSouce(): string;
}

/**
 * 脚本输出节点信息,可通过该对象设置加工过程中的所有上下文信息
 */
declare interface FlowScriptNodeOutputData {

	/**
	 * 设置脚本输出的字段结构,若不设置,则认为节点没有输出数据。(字段类型默认为字符型,字段长度默认为50,浮点型字段的小数点位数默认为2)
	 * @param fields
	 */
	setFields(fields: Array<DbFieldInfo>): void;

	/**
	 * 添加一批字段(字段类型默认为字符型,字段长度默认为50,浮点型字段的小数点位数默认为2)
	 * @param fields
	 */
	addFields(fields: Array<DbFieldInfo>): void;

	/**
	 * 添加一个字段,字段类型默认为字符型,字段长度默认为50,浮点型字段的小数点位数默认为2
	 * @param field 
	 * @param index 默认从0开始,若传入值超过字段个数,则默认放到字段末尾
	 */
	addField(field: DbFieldInfo, index?: number): DbFieldInfo;

	/**
	 * 添加一个字段
	 */
	addField(name: string, dataType?: FieldDataType, length?: number, decimal?: number, dbfield?: string): void;

	/**
	 * 移除一个字段
	 * @param fieldName 
	 */
	removeField(fieldName: string): DbFieldInfo;
	/**
	 * 获取所有字段信息
	 */
	getFields(): Array<DbFieldInfo>;

	/**
	 * 设置物理表信息,后序节点会在此基础上查询
	 * @param ds 数据源
	 * @param schema 模式
	 * @param name 表名
	 */
	setDbTableInfo(ds: string, schema: string, name: string): void;

	/**
	 * 设置sql,后序节点会在此基础上查询。
	 * @param sql 
	 * @param ds 数据源
	 * @param schema 模式
	 */
	setSql(sql: string, ds: string, scheme: string): void;

	/**
	 * 输出一条数据给后序节点。
	 * @param row 
	 */
	writeRow(row: Array<any>): void;
}

# 脚本模板

import etl from "svr-api/etl";

/**
 * 返回脚本节点字段结构。
 * 使用场景:
 * 1. 通过脚本爬取数据到数据仓库,需要生成定义好的字段结构。
 * 2. 通过脚本解析json数据,需要预解析几行数据生成字段。
 */
function onProcessFields(context: IDataFlowScriptNodeContext): DbFieldInfo[] {
	return [];
}

/**
 * 返回脚本节点的数据结构。
 * 可以直接返回DbTableInfo类型的数据,ddl存储查询的sql,table存储的是对应的物理表名。也可以返回一个二维数组。
 * 使用场景:
 * 1. 调用api处理gis数据更新,并需要将处理后的物理表输出为模型。
 * 2. 调用api查询数据,读取仪表板或报表的查询条件作为参数,并将实时查询结果返回。
 */
function onProcessData(context: IDataFlowScriptNodeContext): DbTableInfo | any[][] {
	return null;
}

# 脚本示例

# 使用高德API查询企业经纬度信息

import etl from "svr-api/etl";

/**
 * 利用高德坐标查询服务,批量转换千万级企业地址数据。
 */
// function onProcessScriptNode(context: IDataFlowScriptNodeContext): void {
//     if (context.isPreview) {//用户在加工界面预览脚本节点的数据
//         //在此处做轻量的查询并显示前50个企业的经纬度就好
//         let data = context.getInputDataAsArray();

//         let addresses = data.map(row => {
//             return {
//                 address1: row["address1"],
//                 address2: row["address2"],
//                 address3: row["address3"],
//             }
//         });
//         let locations = etl.transformLocations({
//             addresses : addresses
//         });

//         data.forEach((row, i) => {
//             row["lng"] = locations[i].lng;
//             row["lat"] = locations[i].lat;
//         });

//         context.setOutputDataArray(data);
//         return;
//     }

//     //TODO 考虑增量的情况 @fengyg
//     /**将上游节点的数据输出到一个物理表中并返回物理表,如果上游节点就是一个物理表,那么直接返回 */
//     let dbTable = context.getInputDataAsDbTable();
//     // 调用接口进行gis经纬度更新
//     etl.transformDbTableLocations({
//         geoserver : 'amap',
//         threadsCount : 16,
//         default_province_prefix : '安徽省',
//         geotable : dbTable,  // 中间表名,可以是物理表或模型表
//         geotable_filter : "",
//         startDate : '',
//         endDate : '',
//         id : 'id_',  //唯一编码
//         district : 'DQ_DM',   //地区编码
//         address : 'dwdz', //地址的字段
//         address2 : 'SYDW',  //单位的名称
//         address3 : '',
//         adddetaillog : false,
//         confidence:1,
//         lng : 'jd',
//         lat : 'wd'
//     });

//     // 设置输出表
//     context.setOutputDataAsDbtable(dbTable);
// }

# 爬取企查查数据作为测试数据

import * as http from "svr-api/http";

/**
 * 返回脚本节点字段结构。
 */
function onProcessFields(context: IDataFlowScriptNodeContext): DbFieldInfo[] {
    return [
        { name: "tableType" },
        { name: "id" },
        { name: "labels" },
        { name: "keyNo" },
        { name: "registCapi" },
        { name: "name" },
        { name: "econKind" },
        { name: "hasImage" },
        { name: "status" },
        { name: "role" },
        { name: "type" },
        { name: "startNode" },
        { name: "endNode" },
        { name: "stockPercent", dataType: FieldDataType.N, decimal: 2 },
        { name: "shouldCapi", dataType: FieldDataType.N, decimal: 2 },
    ];
}

/**
 * 爬取企查查数据作为测试数据
 */
function onProcessData(context: IDataFlowScriptNodeContext): DbTableInfo | any[][] {
    // 登录的企查查id
    let qccSessionID = "kq4ef60tvq4qla2km8vq2uh6e2";
    // 要查询的企业id
    let keyNos = ["a0c394dac4c78d43966ba83e27dd70b9"];
    // 查询的层数
    let deep = 1;
    let urlPrefix = "https://www.qichacha.com/company_muhouAction?keyNo=";

    let outputData = context.getOutput();
    // 设置字段信息,添加tableType字段,用于区分该数据是属于关系表还是公司信息表
    let fieldsInfo: DbFieldInfo[] = outputData.getFields();
    outputData.addFields(fieldsInfo);

    // 已经查询过的企业ID
    let processedKeyNo: { [keyNo: string]: boolean } = {};
    let getRelationNodes = (keyNos: string[]): string[] => {
        let keys: string[] = [];
        keyNos.forEach(key => {
            // 已经加载过的企业ID不必再进行加载
            if (processedKeyNo[key] != true) {
                // 根据url获取数据信息,并将其写入到输出流中
                let graph = getJsonData(urlPrefix + key, qccSessionID);
                keys.pushAll(writeData(graph, outputData, fieldsInfo));
                processedKeyNo[key] = true;
            }
        })
        return keys;
    }
    for (let i = 0; i < deep; i++) {
        // 根据企业Id,获取与其相关联的企业ID,并将关联企业信息与关联关系写入到输出流中
        keyNos = getRelationNodes(keyNos);
    }
    return null;
}

/**
 * 处理从网页上爬取的数据信息,并将其写入到输出节点
 * @param datas 
 * @param output 
 * @param tableType 
 * @param fieldsInfo 
 */
function writeData(graph: JSONObject, output: FlowScriptNodeOutputData, fieldsInfo: DbFieldInfo[]): string[] {
    if (graph == null) {
        return [];
    }
    let keyNos: string[] = [];
    let nodes: any[] = graph.nodes || [];
    let relationships: any[] = graph.relationships || [];
    let writeTableData = (datas: any[], tableType: "nodes" | "relationships") => {
        if (datas == null || datas == []) {
            return;
        }
        let fieldCount = fieldsInfo.length;
        let outRow: any[] = new Array(fieldsInfo.length);
        datas.forEach(data => {
            outRow = [tableType];
            let keyNo = data["properties"]["keyNo"];
            if (keyNo) {
                keyNos.push(keyNo)
            }
            for (let i = 1; i < fieldCount; i++) {
                let fieldName = fieldsInfo[i].name;
                let cellData = data[fieldName] || data["properties"][fieldName];
                // 处理数据
                if (cellData instanceof Array) {
                    cellData = cellData.join(";");
                } else if (typeof cellData === 'boolean') {
                    cellData = cellData ? 1 : 0;
                }
                outRow.push(cellData)
            }
            output.writeRow(outRow);
        });
    }
    writeTableData(nodes, "nodes");
    writeTableData(relationships, "relationships");
    return keyNos;
}

/**
 * 根据url从网页上获取企业和企业关系信息
 * @param url 
 */
function getJsonData(url: string, qccSessionID: string): JSONObject {
    let responseResult = http.request({
        url: url,
        headers: {
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3',
            'Accept-Encoding': 'gzip, deflate, br',
            'Accept-Language': 'zh-CN,zh;q=0.9',
            'Cache-Control': 'private',
            'Connection': 'keep-alive',
            'Cookie': 'QCCSESSID=' + qccSessionID + '; acw_tc=6f30a7b215773419007093357e22bfe013a443770ce031ba800d6ee3cb; zg_did=%7B%22did%22%3A%20%2216f40e75afb284-0638f1c7f281-5373e62-144000-16f40e75afd260%22%7D; UM_distinctid=16f40e75b48550-075aec187e7747-5373e62-144000-16f40e75b492f; Hm_lvt_3456bee468c83cc63fb5147f119f1075=1577341902; _uab_collina=157734190205655381960585; hasShow=1; CNZZDATA1254842228=1613897942-1577341582-%7C1577339172; Hm_lpvt_3456bee468c83cc63fb5147f119f1075=1577343695; zg_de1d1a35bfa24ce29bbf2c7eb17e6c4f=%7B%22sid%22%3A%201577341901573%2C%22updated%22%3A%201577343699124%2C%22info%22%3A%201577341901581%2C%22superProperty%22%3A%20%22%7B%7D%22%2C%22platform%22%3A%20%22%7B%7D%22%2C%22utm%22%3A%20%22%7B%7D%22%2C%22referrerDomain%22%3A%20%22%22%2C%22cuid%22%3A%20%22ed5b05fb227ff484e741a2c5220e5e96%22%7D',
            'Host': 'www.qichacha.com',
            'Referer': 'http://www.qichacha.com/',
            'If-Modified-Since': 'Wed, 24 Oct 2018 12:35:27 GMT',
            'If-None-Match': '"59*******"',
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.132 Safari/537.36',
        },
        method: "GET"
    });

    try {
        let jsonData = JSON.parse(responseResult.responseText);
        let success = jsonData["success"];
        let result = success && success["results"] && success["results"][0];
        let data = result && result.data && result.data[0];
        let graph = data && data.graph;
        return graph;
    }
    catch (e) {
        throw new Error(`接口返回的数据无法解析为JSON:${responseResult.responseText}`);
    }
}

# 备份物理表

import etl from "svr-api/etl";
import db from "svr-api/db";

/**
 * 返回脚本节点字段结构。
 */
function onProcessFields(context: IDataFlowScriptNodeContext): DbFieldInfo[] {
    return [{ name: "脚本执行状态", dataType: FieldDataType.M }];
}

/**
 * 备份物理表脚本。
 * 每次执行备份昨天的最新数据,drop掉7天前的表
 */
function onProcessData(context: IDataFlowScriptNodeContext): DbTableInfo | any[][] {
    //在这里输入脚本节点的业务逻辑代码
    let tableName = "F_JJHK_TOPICIS";
    let output = context.getOutput();
    if (context.isPreview) {
        output.writeRow(["预览执行"]);
        return null;  // 需要预览时执行就把return注释掉
    }
    let curDate = new Date();
    curDate.setDate(curDate.getDate() - 1);
    let curTable = tableName + "_" + curDate.getMonth() + "_" + curDate.getDate();
    curDate.setDate(curDate.getDate() - 6);
    let table7 = tableName + "_" + curDate.getMonth() + "_" + curDate.getDate();

    let ds = db.getDataSource("Vertica");

    output.writeRow([ds.getDefaultSchema()]); // 调试使用
    if (ds.isTableExists(curTable)) {
        ds.executeUpdate("drop table " + curTable + " CASCADE");
    }
    output.writeRow([curTable]);  // 调试使用

    let sql = "create table " + curTable + " as (select * from " + tableName + ")";
    output.writeRow([sql]);  // 调试使用
    ds.executeUpdate(sql);
    if (ds.isTableExists(table7)) {
        ds.executeUpdate("drop table " + table7 + " CASCADE");
    }
    output.writeRow(["正式执行成功"]);
    return;
}

# 获取api接口的json数据,并解析成结构化数据

import etl from "svr-api/etl";
import { get } from "svr-api/http";

// 定义输出字段
let outputFields = [
    { name: "LIC_YCER_NAME_CODE", dataType: FieldDataType.C },
    { name: "LIC_VALID_TIME_END", dataType: FieldDataType.C },
    { name: "LIC_VALID_TIME_BEGIN", dataType: FieldDataType.C },
    { name: "LIC_YCER_NAME", dataType: FieldDataType.C },
    { name: "LIC_NUMBER", dataType: FieldDataType.C },
    { name: "LIC_HOLDER_CODE", dataType: FieldDataType.C },
    { name: "ROW_ID", dataType: FieldDataType.C }
]

function onProcessFields(context: IDataFlowScriptNodeContext): DbFieldInfo[] {
    return outputFields;
}

// 从json结果的page.list中读取数据
function onProcessData(context: IDataFlowScriptNodeContext): DbTableInfo | any[][] {
    let result = get("api.succsoft.cn/json");
    let json = JSON.parse(result);
    let list = json.page.list as Array<any>;
    let data = [];
    for (let i = 0, len = list.length; i < len; i++) {
        let row = [];
        for (let j = 0, len1 = outputFields.length; j < len1; j++) {
            row.push(list[i][outputFields[j].name]);
        }
        data.push(row);
    }
    return data;
}

# 读取MongoDB的数据,并解析成结构化数据

通过jdbc连接MongoDB数据库需要org.mongodb:mongodb-driver-sync:4.8.1的jar包,可以通过maven下载对应的jar包后,添加至tomcat目录下的lib文件夹后启动web服务器,并正确配置用户名、密码、IP、端口等信息,本脚本即可正常运行。

import etl from "svr-api/etl";

/** mongodb:[username]:[password]@[ip]:[port] */
const uri = "mongodb://yourusername:yourpassword@192.0.0.1:27017/?authSource=succbi&authMechanism=SCRAM-SHA-256"
const database = "yourdatabase"

/** 样例json字符串 */
const example = '{"_id":"63aabbd8f33226ac32517f2f","_uid":"20865c4edc46474dac5756c296b037c9","dataStatus":"0","fixedAssetClassificationInformation":[{"assetsType":"01","assetsTypeName":"土地、房屋及构筑物","fixedAssetClassificationStandard":"01","fixedAssetClassificationCode":"1010800","fixedAssetClassificationName":"行政单位用地"}],"fixedAssetIDInformation":[{"IDType":"01","IDCode":"111","IDCodeImg":""}],"fixedAssetBusinessInformation":[{"businessName":"","businessCode":"","businessExecutionDate":"","businessExecutor":"","businessType":"","approvalCase":"","businessDescription":""},{"businessName":"房地产处置立项","businessCode":"2","businessExecutionDate":"2022-12-28","businessExecutor":"","businessType":"01","approvalCase":"","businessDescription":""},{"businessName":"资产报废处置","businessCode":"6","businessExecutionDate":"2022-12-29","businessExecutor":"","businessType":"08","approvalCase":"","businessDescription":""},{"businessName":"资产变更","businessCode":"7","businessExecutionDate":"2022-12-29","businessExecutor":"","businessType":"","approvalCase":"","businessDescription":""}],"fixedAssetEntityInformation":{"fixedAssetName":"101","direction":"出借","useState":"01","depreciationState":"01","depreciationMethod":"01","depreciationYearsStatus":"0","depreciationYearsLimit":"","accDepreciation":"","depreciationMonth":"","monthlyDepreciation":"","cardStatus":"","residualValue":"","residualValueRate":"","remarks":""},"fixedAssetManufactureInformation":{"manufacturerName":"","brand":"","specification":"","productId":""},"valueInformation":{"unitPrice":"1","currencyName":"人民币(元)","measureUnit":"立方米","amount":"1","totalPrice":"1"},"fixedAssetPurchaseInformation":{"supplierName":"","contractNumber":"","accountingDocumentNumber":"","invoiceNumber":"","acquirementWay":"","acquirementDate":"","warrantyDeadlineDate":"","estimatedUsefulLife":"","financialAccountingStatus":"","procurementWay":"","budgetItemNumber":"","valueType":"","netBookValue":"","value":"","financialFund":"","nonFinancialFunds":"","financialEntryDate":"","dateOfUse":""},"fixedAssetProprietorInformation":{"userName":"","agencyName":"二级单位101","unitType":"01","unitLevelType":"01","unitNature":"11","departmentName":"","placementLocation":"","organizationCode":"","agencyAddress":"101","zipCode":"101","userDepartment":"","userContactTelephoneNumber":"","userEmail":"","unitUnifiedSocialCreditCode":"101","deviceUsage":"","departmentIdentification":"","financialBudgetCode":"101"},"carInformation":{"plateNo":"","carPl":"","formation":"","carCert":"","certOwner":"","registerDate":"","engineCode":"","carIDCode":"","growPlace":"","repairExpiryDay":"","contractNumber":"","vehicularApplications":""},"houseInformation":{"warmArea":"","dangerArea":"","buildStruct":"","houseRightOwner":"四川机关事务管理局","completeDate":"","designUsage":"","measurementOfStructures":""},"propertyRight":{"propertyRight":"","rightProve":"中华人民共和国房屋所有权证","rightNature":"","rightCertNo":"","rightYear":"","certOwnerName":"","certDate":"","useRightArea":""},"landInformation":{"useRightArea":"","privateArea":"","shareArea":"","useRightType":"","landType":"","landLevel":""},"theMeasureOfArea":{"idleArea":"","selfUseArea":"","lendingArea":"","rentalArea":"","otherAreas":""},"archivalInformation":{"press":"","publicationDate":"","fileNumber":"","shelfLife":""},"specialPurposeInformation":{"ageOfPlanting":"","yearOfPlanting":"","classGenusFamily":"","placeOfOrigin":""},"culturalRelicsInformation":{"source":"","ageOfCollection":"","gradeOfHeritage":""},"intangibleAssets":{"registrationAndRegistrationAuthority":"","approvalNumber":"","patentNumber":"","inventor":"","titleOfInvention":"","theDateOfAuthorizationProclamation":"","certificateNumber":"","patentApplicationNo":"","copyrightCertificateNO":"","copyrightOwner":"","authorizationCertificate":"","authorizationCertificateNo":"","theDeveloper":""}}';

/**
 * 定义字段结构。
 * @param context 
 * @returns 
 */
function onProcessFields(context: IDataFlowScriptNodeContext): DbFieldInfo[] {
	return [
        {name: "资产编号", dataType: FieldDataType.C, length: 50},
        {name: "资产分类代码", dataType: FieldDataType.C, length: 20},
        {name: "资产分类", dataType: FieldDataType.C, length: 50},
        {name: "资产国标大类", dataType: FieldDataType.C, length: 50},
        {name: "资产名称", dataType: FieldDataType.C, length: 50},
    ];
}

/**
 * 输出和字段结构相对应的数据。
 * @param context 
 */
function onProcessData(context: IDataFlowScriptNodeContext): void {
    let MongoClients = Java.type("com.mongodb.client.MongoClients");
    let Filters = Java.type("com.mongodb.client.model.Filters");
    let Document = Java.type("org.bson.Document");
    let Sorts = Java.type("com.mongodb.client.model.Sorts");

    let output = context.getOutput();

    let mongoClient;
    try {
        mongoClient = MongoClients.create(uri);
        let db = mongoClient.getDatabase(database);

        let collection = db.getCollection("cars");
        let iterator = collection.find().sort(Sorts.ascending("_id")).limit(100).skip(0).iterator();

        try {
            while (iterator.hasNext()) {
                let result = iterator.next();
                let jsonStr = result.toJson();
                let json = JSON.parse(jsonStr); // 数据格式参考example对象
                let fixedAssetClassificationInformation = json['fixedAssetClassificationInformation'];// 固定资产购置信息
                let fixedAssetEntityInformation = json['fixedAssetEntityInformation'];// 固定资产实体信息
                // 字段顺序和onProcessFields设置的字段顺序一致
                let row:any[] = [];
                row.push(json['_id']);
                row.push(fixedAssetClassificationInformation[0]["fixedAssetClassificationCode"]);// 固定资产分类代码
                row.push(fixedAssetClassificationInformation[0]["fixedAssetClassificationName"]);// 固定资产分类名称
                row.push(fixedAssetClassificationInformation[0]["fixedAssetClassificationStandard"]);// 固定资产分类标准
                row.push(fixedAssetEntityInformation["fixedAssetName"]);// 固定资产名称
                output.writeRow(row); // 输出数据
            }
        }
        finally {
            iterator.close();
        }

    }
    finally {
        mongoClient && mongoClient.close();
    }
}
是否有帮助?
0条评论
评论