博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
【Pig源码分析】谈谈Pig的数据模型
阅读量:5901 次
发布时间:2019-06-19

本文共 7681 字,大约阅读时间需要 25 分钟。

1. 数据模型

Schema

Pig Latin表达式操作的是relation,FILTER、FOREACH、GROUP、SPLIT等关系操作符所操作的relation就是bag,bag为tuple的集合,tuple为有序的field列表集合,而field表示数据块(A field is a piece of data),可理解为数据字段。

Schema为数据所遵从的类型格式,包括:field的名称及类型(names and types)。用户常用as语句来自定义schema,或是load函数导入schema,比如:

A = foreach X generate .. as field1:chararray, .. as field2:bag{};A = load '..' using PigStorage('\t', '-schema');A = load '..' using org.apache.pig.piggybank.storage.avro.AvroStorage();

若不指定field的类型,则其默认为bytearray。对未知schema进行操作时,有:

  • 若join/cogroup/cross多关系操作遇到未知schema,则会将其视为null schema,导致返回结果的schema也为null;
  • 若flatten一个empty inner schema的bag(即:bag{})时,则返回结果的schema为null;
  • 若union时二者relation的schema不一致,则返回结果的schema为null;
  • 若field的schema为null,会将该字段视为bytearray。

为了保证pig脚本运行的有效性,在写UDF时要在outputSchema方法中指定返回结果的schema。

数据类型

Pig的基本数据类型与对应的Java类:

Simple Pig Type Example Java Class
bytearray DataByteArray
chararray 'hello world' String
int 10 Integer
long 10L Long
float 10.5F or 1050.0F Float
double Double
boolean true/false Boolean
datetime DateTime
bigdecimal BigDecimal
biginteger BigInteger

复杂数据类型及其对应的Java类:

Complex Pig Type Example Java Class
tuple (19, 'hello') Tuple
bag {('hello'), (18, 1)} DataBag
map [open#apache] Map

Pig的复杂数据类型可以嵌套表达,比如:tuple中有tuple (a, (b, c, d)),tuple中有bag (a, {(b,c), (d,e)})等等。但是一定要遵从数据类型本身的定义,比如:bag中只能是tuple的集合,比如{a, {(b),(c)}}就是不合法的。

Pig还有一种特殊的数据类型:null,与Java、C中null不一样,其表示不知道的或不存在的数据类型(unknown or non-existent)。比如,在load数据时,如果有的数据行字段不符合定义的schema,则该字段会被置为null。

2. 源码分析

以下源码分析采用的是0.12版本。

Tuple

在KEYSET源码中,创建Tuple对象采用工厂+单例设计模式

private static final TupleFactory TUPLE_FACTORY = TupleFactory.getInstance();Tuple t = TUPLE_FACTORY.newTuple(s);

事实上,TupleFactory是个抽象类,实现接口TupleMaker<Tuple>。在方法TupleFactory.getInstance()中,默认情况下返回的是BinSedesTupleFactory对象,同时支持加载用户重写的TupleFactory类(pig.data.tuple.factory.name指定类名、 pig.data.tuple.factory.jar指定类所在的jar)。BinSedesTupleFactory继承于TupleFactory:

399159-20160120141907140-1837931798.png

在BinSedesTupleFactory的newTuple方法中,返回的是BinSedesTuple对象。BinSedesTuple类继承于DefaultTuple类,在DefaultTuple类中有List<Object> mFields字段,这便是存储Tuple数据的地方了,mFields所持有类型为ArrayList<Object>();。类图关系:

399159-20160120141918578-1275166361.png

Bag

创建Bag对象有下面几种方法:

// factoryBagFactory mBagFactory = BagFactory.getInstance();DataBag output = mBagFactory.newDefaultBag();// if you know upfront how many tuples you are going  to put in this bag.DataBag bag = new NonSpillableDataBag(m.size());

与TupleFactory一样,BagFactory也是抽象类,也支持用户自定义重写;getInstance方法默认返回的是DefaultBagFactory。DefaultBagFactory有newDefaultBag、newSortedBag、newDistinctBag方法分别创建三类bag:

  • default bag中的tuple没有排序,也没有去重;
  • sorted bag中的tuple是按序存放,顺序是由tuple default comparator或bag创建时的comparator所定义的;
  • distinct bag顾名思义,tuple有去重。

三类bag的构造器如下:

public DefaultDataBag() {    mContents = new ArrayList
();}public SortedDataBag(Comparator
comp) { mComp = (comp == null) ? new DefaultComparator() : comp; mContents = new ArrayList
();}public DistinctDataBag() { mContents = new HashSet
();}

BagFactory的类图:

399159-20160120141931297-1758422360.png

DefaultAbstractBag作为三种类型bag的基类,有一个字段mContents用于存放tuple,NonSpillableDataBag直接实现DataBag接口。DataBag的类图:

399159-20160120141937218-296525038.png

3. 实战

现有avro日志数据(见),其字段:

  • dvc表示用户手机标识;
  • appUseappInstall同为avro Map类型,其key为app名称(app name),value为Map<String, Object>,包含了一个表示使用时间的字段timelist(类型为ArrayList);具体格式如下
'dvc': 'imei_123','appUse': {    'app name1': {        ...        'timelist': [...]    },    'app name2': {        ...        'timelist': [...]    },    ...},'appInstall': {    'app name1': {        ...        'timelist': [...]    },    ...}

现在,想要得到每个用户的app列表及app的打开次数,以格式dvc, {(app)}, {(app, frequency)}输出,即用户 + app列表 + 使用次数类表。如果用MapRduce做,得分为以下步骤:

  1. 以(dvc, app)为key值,计算value值为使用次数;
  2. 以dvc为key值,合并同一用户的不同app,value值为(app, fre);
  3. 以dvc为key值,计算appinstall的app列表;
  4. 将步骤2得到的数据与步骤3得到的数据做join,然后输出。

可以看出用MapReduce略显繁复,如何来用pig来实现呢?我们可以对appUse:map[]编写EVAL UDF,让其返回(app名称, timelist的长度) :

public class AppTimelist  extends EvalFunc
{ private static final TupleFactory TUPLE_FACTORY = TupleFactory.getInstance(); private static final BagFactory BAG_FACTORY = BagFactory.getInstance(); @SuppressWarnings({ "unchecked" }) @Override public DataBag exec(Tuple input) throws IOException { Map
> m = (Map
>) input.get(0); List
result = new ArrayList(); DataBag output = BAG_FACTORY.newDefaultBag(); if(m == null) return null; for(Map.Entry
> e: m.entrySet()) { result.clear(); String app = e.getKey(); long size = ((DataBag) e.getValue().get("timelist")).size(); result.add(app); result.add(size); output.add(TUPLE_FACTORY.newTuple(result)); } return output; }}

pig将Java的ArrayList转成DataBag的类型,所以要对timelist进行强转操作。

appInstall:map[]编写EVAL UDF,返回(appList):

public class DistinctBag extends EvalFunc
{ BagFactory mBagFactory = BagFactory.getInstance(); @Override public DataBag exec(Tuple input) throws IOException { if(input == null || input.size() == 0) { return null; } DataBag in = (DataBag) input.get(0); DataBag out = mBagFactory.newDistinctBag(); if(in == null) { return null; } for(Tuple tp: in) { DataBag applist = (DataBag) tp.get(0); for(Tuple app: applist) out.add(app); } return out; }}

上面提到过,若没有给EVAL UDF指定返回值的schema,则返回结果的schema为null,如此会造成类型的丢失,在后面的操作中容易报NullPointerException。

// AppTimelist.java@Overridepublic Schema outputSchema(Schema input) {    try {        Schema tupleSchema = new Schema();        FieldSchema chararrayFieldSchema = new Schema.FieldSchema(null, DataType.CHARARRAY);        FieldSchema longFieldSchema = new Schema.FieldSchema(null, DataType.LONG);        tupleSchema.add(chararrayFieldSchema);        tupleSchema.add(longFieldSchema);        return new Schema(new Schema.FieldSchema(getSchemaName(this                .getClass().getName().toLowerCase(), input), tupleSchema,                DataType.TUPLE));    } catch (Exception e) {        return null;    }}// DistinctBag.java@Overridepublic Schema outputSchema(Schema input) {    FieldSchema innerFieldSchema = new Schema.FieldSchema(null, DataType.CHARARRAY);    Schema innerSchema = new Schema(innerFieldSchema);    Schema bagSchema = null;    try {        bagSchema = new Schema(new FieldSchema(null, innerSchema, DataType.BAG));    } catch(FrontendException e) {        throw new RuntimeException(e);    }    return bagSchema;}

统计app列表:

define AvroStorage org.apache.pig.piggybank.storage.avro.AvroStorage;define DistinctBag com.pig.udf.bag.DistinctBag;A = load '..' using AvroStorage();B = foreach A generate value.fields.data#'dvc' as dvc:chararray, value.fields.data#'appInstall' as ins:map[map[]];C = foreach B generate dvc, KEYSET(ins) as applist;D = group C by dvc;-- extract applist from grouped DE = foreach D {    projected = foreach $1 generate applist;    generate group as dvc, projected as grouped;}F = foreach E generate dvc, DistinctBag(grouped) as applist;store F into '..' using AvroStorage();

统计app使用时长:

define AvroStorage org.apache.pig.piggybank.storage.avro.AvroStorage;define AppTimelist com.pig.udf.map.AppTimelist;A = load '..' using AvroStorage();B = foreach A generate value.fields.data#'dvc' as dvc:chararray, value.fields.data#'appUse' as use:map[map[]];C = foreach B generate dvc, flatten(AppTimelist(use)) as (app, fre);D = group C by (dvc, app);E = foreach D generate flatten(group) as (dvc, app), SUM($1.fre) as fre;F = group E by dvc;G = foreach F {        projected = foreach $1 generate app, fre;        generate group as dvc, projected as appfre;}store G into '..' using AvroStorage();

二者做join即可得到结果。

转载于:https://www.cnblogs.com/en-heng/p/5145054.html

你可能感兴趣的文章
occActiveX - ActiveX with OpenCASCADE
查看>>
BeanUtils\DBUtils
查看>>
python模块--os模块
查看>>
Java 数组在内存中的结构
查看>>
《关爱码农成长计划》第一期报告
查看>>
学习进度表 04
查看>>
谈谈javascript中的prototype与继承
查看>>
时序约束优先级_Vivado工程经验与各种时序约束技巧分享
查看>>
minio 并发数_MinIO 参数解析与限制
查看>>
flash back mysql_mysqlbinlog flashback 使用最佳实践
查看>>
mysql存储引擎模式_MySQL存储引擎
查看>>
java 重写system.out_重写System.out.println(String x)方法
查看>>
配置ORACLE 11g绿色版客户端和PLSQL远程连接环境
查看>>
ASP.NET中 DataList(数据列表)的使用前台绑定
查看>>
Linux学习之CentOS(八)--Linux系统的分区概念
查看>>
System.Func<>与System.Action<>
查看>>
asp.net开源CMS推荐
查看>>
csharp skype send message in winform
查看>>
MMORPG 游戏服务器端设计--转载
查看>>
HDFS dfsclient写文件过程 源码分析
查看>>