- 浏览: 26663 次
- 性别:
- 来自: 北京
最新评论
-
pinkmoon:
HBase 0.96配置 snappy(绝对有效哦亲) -
pinkmoon:
记一次痛苦的 hadoop 2编译过程 -
半点玻璃心:
dsx1013 写道你好,我有snappy 源码安装,没有指定 ...
HBase 0.96配置 snappy(绝对有效哦亲) -
dsx1013:
你好,我有snappy 源码安装,没有指定安装目录,默认安装路 ...
HBase 0.96配置 snappy(绝对有效哦亲) -
半点玻璃心:
推文7 写道你好,我也遇到了这个问题,能否麻烦把您编译的had ...
HBase 0.96配置 snappy(绝对有效哦亲)
又回来了,还是看put,不过版本号变了,希望看0.94的童靴移驾到http://dennis-lee-gammy.iteye.com/admin/blogs/1972269
put和doput方法变化不大,唯一就是原来的缓存队列名字里面加了一个async,然后类型由ArrayList变成了LinkedList。
flushCommit方法
变化真大啊,原来42行代码一下只有这么点了,以前核心功能由
正常流程几乎完全找不到以前的影子!这里多出来一个处理类org.apache.hadoop.hbase.client.AsyncProcess,即ap成员。这个类是0.94版的代码里面完全没有的。难怪变化那么大。
首先这里有一个参数,指定为同步执行还是异步执行。从上面的doput方法和flushcommit方法可以看出,如果在doput的过程中,也就是调用htable.put(Put)的时候,如果缓存大小超过了客户端写缓存大小的限制,调用这个方法是异步的;而在flushcommit方法中,这个方法是同步的。这里也暴露出来一个与原有流程不同的地方,0.94中doput如果超过大小限制,是委托flushcommit方法提交的,而这里采用了一种更加柔和的方式。另外,那个htable的线程池成员在方法中也找不到它的影子了,以前可是带着到处跑的。
主流程差不多就完成了。重要的两个流程:请求和处理响应,应该是在
那什么情况下表示有空闲资源呢,看看【1】处的相关代码
备注【3】,定位Resion,大操作,详见http://dennis-lee-gammy.iteye.com/admin/blogs/1973255
备注【5】,发送请求,大头,这里先放放
put和doput方法变化不大,唯一就是原来的缓存队列名字里面加了一个async,然后类型由ArrayList变成了LinkedList。
flushCommit方法
public void flushCommits() throws InterruptedIOException, RetriesExhaustedWithDetailsException { // We're looping, as if one region is overloaded we keep its operations in the buffer. // As we can have an operation in progress even if the buffer is empty, we call // backgroundFlushCommits at least one time. do { backgroundFlushCommits(true); } while (!writeAsyncBuffer.isEmpty()); }
private void doPut(Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException { if (ap.hasError()){ backgroundFlushCommits(true); } validatePut(put); currentWriteBufferSize += put.heapSize(); writeAsyncBuffer.add(put); while (currentWriteBufferSize > writeBufferSize) { backgroundFlushCommits(false); }
变化真大啊,原来42行代码一下只有这么点了,以前核心功能由
connection.processBatch(writeBuffer, tableName, pool, results);完成,这里变成了循环。以前还会检查并保存执行失败的操作返回到缓存列表中,这里第一眼是看不到这些了。看看backgroundFlushCommits 卖的是神马药。
private void backgroundFlushCommits(boolean synchronous) throws InterruptedIOException, RetriesExhaustedWithDetailsException { try { // If there is an error on the operations in progress, we don't add new operations. if (writeAsyncBuffer.size() > 0 && !ap.hasError()) { ap.submit(writeAsyncBuffer, true);//如果任务队列没有清空,并且异步执行器没有问题,则执行提交操作 } if (synchronous || ap.hasError()) { if (ap.hasError() && LOG.isDebugEnabled()) { LOG.debug(tableName + ": One or more of the operations have failed -" + " waiting for all operation in progress to finish (successfully or not)"); } ap.waitUntilDone();//如果是同步模式,或者出现了错误,则都变成同步模式,需要等待完成 } if (ap.hasError()) { if (!clearBufferOnFail) { // if clearBufferOnFailed is not set, we're supposed to keep the failed operation in the // write buffer. This is a questionable feature kept here for backward compatibility // 如果不是失败则清除模式,则保存失败的操作,功能与0.94版本是一致的,不过原来的版本在提交任务的时候 // 会一并上传一个结果集合,顺序与任务提交的顺序一一对应。顺序取回结果查看是否成功, // 并将成功的操作从缓存队列中移除。 // 而现在的代码,表面上看应该是在某个地方已经清空了,然后ap负责记录并返回失败的操作 writeAsyncBuffer.addAll(ap.getFailedOperations()); } // 目测ap已经完成了重试,并记录了应有的异常 RetriesExhaustedWithDetailsException e = ap.getErrors(); ap.clearErrors(); throw e; } } finally { currentWriteBufferSize = 0; for (Row mut : writeAsyncBuffer) { if (mut instanceof Mutation) { currentWriteBufferSize += ((Mutation) mut).heapSize();//既然缓存队列之前已经被清除过,也就不用判断是否是失败清除模式了,简单的计算下缓存大小吧。 } } } }
正常流程几乎完全找不到以前的影子!这里多出来一个处理类org.apache.hadoop.hbase.client.AsyncProcess,即ap成员。这个类是0.94版的代码里面完全没有的。难怪变化那么大。
首先这里有一个参数,指定为同步执行还是异步执行。从上面的doput方法和flushcommit方法可以看出,如果在doput的过程中,也就是调用htable.put(Put)的时候,如果缓存大小超过了客户端写缓存大小的限制,调用这个方法是异步的;而在flushcommit方法中,这个方法是同步的。这里也暴露出来一个与原有流程不同的地方,0.94中doput如果超过大小限制,是委托flushcommit方法提交的,而这里采用了一种更加柔和的方式。另外,那个htable的线程池成员在方法中也找不到它的影子了,以前可是带着到处跑的。
主流程差不多就完成了。重要的两个流程:请求和处理响应,应该是在
ap.submit(writeAsyncBuffer, true)和
ap.waitUntilDone();中实现。继续吧
public void submit(List<? extends Row> rows, boolean atLeastOne) throws InterruptedIOException { if (rows.isEmpty()) { return; } // This looks like we are keying by region but HRegionLocation has a comparator that compares // on the server portion only (hostname + port) so this Map collects regions by server. // 熟悉的面孔,这不是94中HConnectionImplementation.processBatchCallback(list, tableName, pool, results, null) // step 1 第一行么,原来跑这里来了,HRegionLocation --> MultiAction<Row> 的字典结构。 Map<HRegionLocation, MultiAction<Row>> actionsByServer = new HashMap<HRegionLocation, MultiAction<Row>>(); List<Action<Row>> retainedActions = new ArrayList<Action<Row>>(rows.size()); do { // Wait until there is at least one slot for a new task. // 等待空闲资源执行操作,maxTotalConcurrentTasks =hbase.client.max.total.tasks // 默认100,配置文件里面没有哦,亲 TODO【1】 waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1); // Remember the previous decisions about regions or region servers we put in the // final multi. Map<String, Boolean> regionIncluded = new HashMap<String, Boolean>(); Map<ServerName, Boolean> serverIncluded = new HashMap<ServerName, Boolean>(); int posInList = -1; Iterator<? extends Row> it = rows.iterator(); while (it.hasNext()) { Row r = it.next(); HRegionLocation loc = findDestLocation(r, 1, posInList);//定位region TODO【2】 if (loc != null && canTakeOperation(loc, regionIncluded, serverIncluded)) {//判断region TODO【3】 // loc is null if there is an error such as meta not available. Action<Row> action = new Action<Row>(r, ++posInList); retainedActions.add(action); addAction(loc, action, actionsByServer);//添加操作 ,跟之前的step 1里面的步骤一致,multiAction按HRegionLocation聚类 it.remove();//果然,缓存队列在这里会被逐步清空 } } } while (retainedActions.isEmpty() && atLeastOne && !hasError()); HConnectionManager.ServerErrorTracker errorsByServer = createServerErrorTracker(); //创建跟踪异常,如果需要创建(hbase.client.retries.by.server指定,配置文件没有,默认为true),则返回一个 //HConnectionManager.ServerErrorTracker sendMultiAction(retainedActions, actionsByServer, 1, errorsByServer);//发送请求 TODO【4】 }
那什么情况下表示有空闲资源呢,看看【1】处的相关代码
private void waitForMaximumCurrentTasks(int max) throws InterruptedIOException { long lastLog = EnvironmentEdgeManager.currentTimeMillis(); long currentTasksDone = this.tasksDone.get(); while ((tasksSent.get() - currentTasksDone) > max) {//如果已发送的任务跟已经完成的任务数差值过大 long now = EnvironmentEdgeManager.currentTimeMillis(); if (now > lastLog + 10000) { lastLog = now; LOG.info(": Waiting for the global number of running tasks to be equals or less than " + max + ", tasksSent=" + tasksSent.get() + ", tasksDone=" + tasksDone.get() + ", currentTasksDone=" + currentTasksDone + ", tableName=" + tableName); } waitForNextTaskDone(currentTasksDone);//等待下一个任务完成 currentTasksDone = this.tasksDone.get();//看看完成了多少个 } } //这个简单,如果已完成任务数没有变化就等100ms protected void waitForNextTaskDone(long currentNumberOfTask) throws InterruptedIOException { while (currentNumberOfTask == tasksDone.get()) { try { synchronized (this.tasksDone) { this.tasksDone.wait(100); } } catch (InterruptedException e) { throw new InterruptedIOException("Interrupted." + " currentNumberOfTask=" + currentNumberOfTask + ", tableName=" + tableName + ", tasksDone=" + tasksDone.get()); } } }
protected boolean canTakeOperation(HRegionLocation loc, Map<String, Boolean> regionsIncluded, Map<ServerName, Boolean> serversIncluded) { String encodedRegionName = loc.getRegionInfo().getEncodedName(); Boolean regionPrevious = regionsIncluded.get(encodedRegionName); //之前已经有这个region信息,则直接返回以保存的结果,这里有个问题,如果region信息有更新呢?估计在后面的代码里面。 if (regionPrevious != null) { // We already know what to do with this region. return regionPrevious; } //没有的话看看RS的信息,如果RS已经挂了,那么他对应的所有region都挂,不用看了,记录一下告诉上层吧 Boolean serverPrevious = serversIncluded.get(loc.getServerName()); if (Boolean.FALSE.equals(serverPrevious)) { // It's a new region, on a region server that we have already excluded. regionsIncluded.put(encodedRegionName, Boolean.FALSE); return false; } AtomicInteger regionCnt = taskCounterPerRegion.get(encodedRegionName); if (regionCnt != null && regionCnt.get() >= maxConcurrentTasksPerRegion) { // Too many tasks on this region already.hbase.client.max.perregion.tasks设置,默认为1哦,配置文件没有哦亲,每次只能运行一个任务?这个设置MS有点坑,后续看看改大了会不会有影响 regionsIncluded.put(encodedRegionName, Boolean.FALSE); return false; } if (serverPrevious == null) { // The region is ok, but we need to decide for this region server. int newServers = 0; // number of servers we're going to contact so far for (Map.Entry<ServerName, Boolean> kv : serversIncluded.entrySet()) { if (kv.getValue()) { newServers++; } } // Do we have too many total tasks already? 如果server的数量与等待完成的任务之和小于最大任务数(之前说过,默认100) boolean ok = (newServers + getCurrentTasksCount()) < maxTotalConcurrentTasks; if (ok) { //在检查是否每个server能承受的最大任务数hbase.client.max.perserver.tasks=5,怎么都那么小呢,还不能在配置文件里面找到,坑死了啊 // If the total is fine, is it ok for this individual server? AtomicInteger serverCnt = taskCounterPerServer.get(loc.getServerName()); ok = (serverCnt == null || serverCnt.get() < maxConcurrentTasksPerServer); } // 如果检查失败,RS和Region都设置为false if (!ok) { regionsIncluded.put(encodedRegionName, Boolean.FALSE); serversIncluded.put(loc.getServerName(), Boolean.FALSE); return false; } serversIncluded.put(loc.getServerName(), Boolean.TRUE); } else { assert serverPrevious.equals(Boolean.TRUE); } regionsIncluded.put(encodedRegionName, Boolean.TRUE); return true; }
备注【3】,定位Resion,大操作,详见http://dennis-lee-gammy.iteye.com/admin/blogs/1973255
备注【5】,发送请求,大头,这里先放放
发表评论
-
hbase MemStoreLAB代码浅析-1
2014-09-30 17:21 1101本文基于 hbase 0.98x,如果发现源码与你的副本不符合 ... -
HBase 0.96 服务端写流程代码阅读笔记
2014-02-24 15:36 0private long doMiniBatchMutati ... -
HBase 0.96配置 snappy(绝对有效哦亲)
2014-02-12 14:10 3986通常情况下,snappy压缩算法无非是hbase 最好的伴侣, ... -
HBase Memstore flush代码阅读笔记-2 -由 XXX 触发的 flush
2014-01-22 18:44 0之前看到在执行 mutate 操作之前,RS 会检查 mems ... -
HBase Memstore flush代码阅读笔记-1 -由 lowerlimit 和 upperlimit 触发的 flush
2014-01-22 18:34 1774在写请求(put,delete)到达服务端时,服务端(HReg ... -
HBase Memstore flush代码阅读笔记-2-由单个 memstore大小超过限制触发的 flush
2014-01-23 15:15 3517本代码基于0.96.1.1:http://svn.apache ... -
HBase Memstore配置
2014-01-21 11:09 440... -
HBASE 代码阅读笔记-1 - PUT-3-提交任务2(基于0.96-hadoop2)
2013-11-11 19:13 834看看MultiServerCallable的核心方法,call ... -
HBASE 代码阅读笔记-1 - PUT-2-定位RS和R-1(0.96-HADOOP2)
2013-11-08 19:54 1991按照94的阅读进度,这里该看如何定位RS和Region了 先回 ... -
HBASE 代码阅读笔记-1 - PUT-3-提交任务2(基于0.94.12)
2013-11-08 12:55 163上一篇把提交任务的主流程整理了下,遗留了连接、发送请求、处理响 ... -
HBASE 代码阅读笔记-1 - PUT-3-提交任务1(基于0.94.12)
2013-11-07 19:44 1049终于把RS的定位问题搞清楚了些些,时间不等人,马上看看conn ... -
HBase Memstore配置
2013-11-07 15:47 2847HBase Memstore配置 本文为翻译,原英文地址:ht ... -
HBASE 代码阅读笔记-1 - PUT-2-定位RS和REGION(基于0.94.12)
2013-11-07 17:01 193上一篇http://dennis-lee-gammy.itey ... -
HBASE 代码阅读笔记-1 - PUT-1-主流程(基于0.94.12)
2013-11-06 19:57 342最近闲来无事看看hbase ...
相关推荐
赠送jar包:hbase-hadoop2-compat-1.2.12.jar; 赠送原API文档:hbase-hadoop2-compat-1.2.12-javadoc.jar; 赠送源代码:hbase-hadoop2-compat-1.2.12-sources.jar; 赠送Maven依赖信息文件:hbase-hadoop2-compat-...
赠送jar包:hbase-hadoop2-compat-1.1.3.jar; 赠送原API文档:hbase-hadoop2-compat-1.1.3-javadoc.jar; 赠送源代码:hbase-hadoop2-compat-1.1.3-sources.jar; 赠送Maven依赖信息文件:hbase-hadoop2-compat-...
赠送jar包:hbase-hadoop-compat-1.1.3.jar; 赠送原API文档:hbase-hadoop-compat-1.1.3-javadoc.jar; 赠送源代码:hbase-hadoop-compat-1.1.3-sources.jar; 赠送Maven依赖信息文件:hbase-hadoop-compat-1.1.3....
赠送jar包:hbase-hadoop-compat-1.1.3.jar; 赠送原API文档:hbase-hadoop-compat-1.1.3-javadoc.jar; 赠送源代码:hbase-hadoop-compat-1.1.3-sources.jar; 赠送Maven依赖信息文件:hbase-hadoop-compat-1.1.3....
赠送jar包:hbase-hadoop2-compat-1.1.3.jar; 赠送原API文档:hbase-hadoop2-compat-1.1.3-javadoc.jar; 赠送源代码:hbase-hadoop2-compat-1.1.3-sources.jar; 赠送Maven依赖信息文件:hbase-hadoop2-compat-...
赠送jar包:hbase-hadoop2-compat-1.2.12.jar; 赠送原API文档:hbase-hadoop2-compat-1.2.12-javadoc.jar; 赠送源代码:hbase-hadoop2-compat-1.2.12-sources.jar; 赠送Maven依赖信息文件:hbase-hadoop2-compat-...
赠送jar包:hbase-hadoop2-compat-1.4.3.jar; 赠送原API文档:hbase-hadoop2-compat-1.4.3-javadoc.jar; 赠送源代码:hbase-hadoop2-compat-1.4.3-sources.jar; 赠送Maven依赖信息文件:hbase-hadoop2-compat-...
赠送jar包:hbase-hadoop-compat-1.2.12.jar; 赠送原API文档:hbase-hadoop-compat-1.2.12-javadoc.jar; 赠送源代码:hbase-hadoop-compat-1.2.12-sources.jar; 赠送Maven依赖信息文件:hbase-hadoop-compat-...
赠送jar包:hbase-hadoop-compat-1.4.3.jar; 赠送原API文档:hbase-hadoop-compat-1.4.3-javadoc.jar; 赠送源代码:hbase-hadoop-compat-1.4.3-sources.jar; 赠送Maven依赖信息文件:hbase-hadoop-compat-1.4.3....
赠送jar包:hbase-hadoop-compat-1.2.12.jar; 赠送原API文档:hbase-hadoop-compat-1.2.12-javadoc.jar; 赠送源代码:hbase-hadoop-compat-1.2.12-sources.jar; 赠送Maven依赖信息文件:hbase-hadoop-compat-...
extclassgenerator.zip,为Ext JS和Sencha TouchExt JS代码生成器创建Ext JavaScript类的生成器。从java类创建模型js类
hbase-hbck2-1.1.0-SNAPSHOT.jar
tapestry-security.zip,基于shiro security的tapestry 5的tynamo安全包tapestry security是基于apache shiro的apache tapestry 5的安全模块
HBase(hbase-2.4.9-bin.tar.gz)是一个分布式的、面向列的开源数据库,该技术来源于 Fay Chang 所撰写的Google论文“Bigtable:一个结构化数据的分布式存储系统”。就像Bigtable利用了Google文件系统(File System...
Java操作hbase完成hbase数据文件下载
ycsb-hbase14-binding-0.17.0
hbase_0.98.13-hadoop2-bin.tar.gz的相关包。希望能解决你们的问题。
hbase-0.98.12.1-hadoop1-bin.tar.gz编译后安装包,下载解压可直接使用。
该资源为java客户端连接hbase集群,在windows客户端配置hadoop环境所需要用到的工具类,有需自取
hbase-0.98.17-hadoop2-bin.tar.gz