谈disruptor的单线程数据库操作
作者:网络转载 发布时间:[ 2014/5/14 13:08:46 ] 推荐标签:单线程数据库
|
/**
* 静态类
* @return
*/
private DisruptorHelper(){ }
/**
* 初始化
*/
private void init(){
execute=new ExecutorService[group];
ringBuffer=new RingBuffer[group];
sequenceBarrier=new SequenceBarrier[group];
handler=new TaskEventHandler[group];
batchEventProcessor=new BatchEventProcessor[group];
////////////////定时执行////////////////
//初始化ringbuffer,存放Event
for(int i=0;i<group;i++){
ringBuffer[i] = RingBuffer.create(ProducerType.SINGLE, TaskEvent.EVENT_FACTORY, BUFFER_SIZE, new YieldingWaitStrategy());
sequenceBarrier[i] = ringBuffer[i].newBarrier();
handler[i] = new TaskEventHandler();
batchEventProcessor[i] = new BatchEventProcessor<TaskEvent>(ringBuffer[i], sequenceBarrier[i], handler[i]);
ringBuffer[i].addGatingSequences(batchEventProcessor[i].getSequence());
execute[i]= Executors.newSingleThreadExecutor();
execute[i].submit(instance.batchEventProcessor[i]);
}
this.taskTimer = Executors.newScheduledThreadPool(10, new CustomThreadFactory("DisruptorHelper-scheduler", true));
inited = true;
}
/**
* 执行定时器
* @param tk
*/
private void produce(int index,Task tk){
//System.out.println("index:="+index);
if(index<0||index>=group) {
System.out.println("out of group index:="+index);
return;
}
// if capacity less than 10%, don't use ringbuffer anymore
System.out.println("capacity:="+ringBuffer[index].remainingCapacity());
if(ringBuffer[index].remainingCapacity() < BUFFER_SIZE * 0.1) {
System.out.println("disruptor:ringbuffer avaliable capacity is less than 10 %");
// do something
}else {
long sequence = ringBuffer[index].next();
//将状态报告存入ringBuffer的该序列号中
ringBuffer[index].get(sequence).setTask(tk);
//通知消费者该资源可以消费
ringBuffer[index].publish(sequence);
}
}
/**
* 获得容器的capacity的数量
* @param index
* @return
*/
private long remainingcapacity(int index){
//System.out.println("index:="+index);
if(index<0||index>=group) {
System.out.println("out of group index:="+index);
return 0L;
}
long capacity= ringBuffer[index].remainingCapacity();
return capacity;
}
private void shutdown0(){
for(int i=0;i<group;i++){
execute[i].shutdown();
}
}
////////////////////////////////下面是静态方法提供调用////////////////////////////////////////////////////////
/**
* 直接消费
* @param tk
*/
public static void addTask(int priority,Task tk){
instance.produce(priority,tk);
}
/**
* 定时消费
* @param tk
* @param delay
* @param period
*/
public static void scheduleTask(int priority,Task tk,long delay,long period){
Runnable timerTask = new ScheduledTask(priority, tk);
taskTimer.scheduleAtFixedRate(timerTask, delay, period, TimeUnit.MILLISECONDS);
}
/**
* 定点执行
* @param tk
* @param hourse
* @param minus
* @param sec
* @return
*/
public static Runnable scheduleTask(int priority,Task tk, int hourse,int minus,int sec)
{
Runnable timerTask = new ScheduledTask(priority, tk);
//每天2:30分执行
long delay = Helper.calcDelay(hourse,minus,sec);
long period = Helper.ONE_DAY;
System.out.println("delay:"+(delay/1000)+"secs");
taskTimer.scheduleAtFixedRate(timerTask, delay, period, TimeUnit.MILLISECONDS);
return timerTask;
}
//对定时执行的程序进行分装
private static class ScheduledTask implements Runnable
{
private int priority;
private Task task;
ScheduledTask(int priority, Task task)
{
this.priority = priority;
this.task = task;
}
public void run()
{
try{
instance.produce(priority,task);
}catch(Exception e){
System.out.println("catch exception in DisruptorHelper!");
}
}
}
public static long getRemainingCapatiye(int index){
return instance.getRemainingCapatiye(index);
}
public static void shutdown(){
if(!inited){
throw new RuntimeException("Disruptor还没有初始化!");
}
instance.shutdown0();
}
}
|
本文内容不用于商业目的,如涉及知识产权问题,请权利人联系SPASVO小编(021-61079698-8054),我们将立即处理,马上删除。
相关推荐
在测试数据库性能时,需要注意哪些方面的内容?测试管理工具TC数据库报错的原因有哪些?怎么解决?数据库的三大范式以及五大约束编程常用的几种时间戳转换(java .net 数据库)优化mysql数据库的几个步骤数据库并行读取和写入之Python实现深入理解数据库(DB2)缓冲池(BufferPool)国内三大云数据库测试对比预警即预防:6大常见数据库安全漏洞数据库规划、设计与管理数据库-事务的概念SQL Server修改数据库物理文件存在位置使用PHP与SQL搭建可搜索的加密数据库用Python写一个NoSQL数据库详述 SQL 中的数据库操作详述 SQL 中的数据库操作Java面试准备:数据库MySQL性能优化

sales@spasvo.com