select /*+parallel(t,25)+*/
成都创新互联专注为客户提供全方位的互联网综合服务,包含不限于网站制作、成都做网站、启东网络推广、重庆小程序开发、启东网络营销、启东企业策划、启东品牌公关、搜索引擎seo、人物专访、企业宣传片、企业代运营等,从售前售中售后,我们都将竭诚为您服务,您的肯定,是我们最大的嘉奖;成都创新互联为所有大学生创业者提供启东建站搭建服务,24小时服务热线:18980820575,官方网址:www.cdcxhl.com
一、Parallel
1. 用途
强行启用并行度来执行当前SQL。这个在Oracle 9i之后的版本可以使用,之前的版本现在没有环境进行测试。也就是说,加上这个说明,可以强行启用Oracle的多线程处理功能。举例的话,就像电脑装了多核的CPU,但大多情况下都不会完全多核同时启用(2核以上的比较明显),使用parallel说明,就会多核同时工作,来提高效率。
但本身启动这个功能,也是要消耗资源与性能的。所有,一般都会在返回记录数大于100万时使用,效果也会比较明显。
2. 语法
/*+parallel(table_short_name,cash_number)*/
这个可以加到insert、delete、update、select的后面来使用(和rule的用法差不多,有机会再分享rule的用法)
开启parallel功能的语句是:
alter session enable parallel dml;
这个语句是DML语句哦,如果在程序中用,用execute的方法打开。
3. 实例说明
用ERP中的transaction来说明下吧。这个table记录了所有的transaction,而且每天数据量也算相对比较大的(根据企业自身业务量而定)。假设我们现在要查看对比去年一年当中每月的进、销情况,所以,一般都会写成:
select to_char(transaction_date,'yyyymm') txn_month,
sum(
decode(
sign(transaction_quantity),1,transaction_quantity,0
)
) in_qty,
sum(
decode(
sign(transaction_quantity),-1,transaction_quantity,0
)
) out_qty
from mtl_material_transactions mmt
where transaction_date = add_months(
to_date(
to_char(sysdate,'yyyy')||'0101','yyyymmdd'),
-12)
and transaction_date = add_months(
to_date(
to_char(sysdate,'yyyy')||'1231','yyyymmdd'),
-12)
group by to_char(transaction_date,'yyyymm')
这个SQL执行起来,如果transaction_date上面有加index的话,效率还算过的去;但如果没有加index的话,估计就会半个小时内都执行不出来。这是就可以在select 后面加上parallel说明。例如:
select /*+parallel(mmt,10)*/
to_char(transaction_date,'yyyymm') txn_month,
...
这样的话,会大大提高执行效率。如果要将检索出来的结果insert到另一个表tmp_count_tab的话,也可以写成:
insert /*+parallel(t,10)*/
into tmp_count_tab
(
txn_month,
in_qty,
out_qty
)
select /*+parallel(mmt,10)*/
to_char(transaction_date,'yyyymm') txn_month,
...
插入的机制和检索机制差不多,所以,在insert后面加parallel也会加速的。关于insert机制,这里暂不说了。
Parallel后面的数字,越大,执行效率越高。不过,貌似跟server的配置还有oracle的配置有关,增大到一定值,效果就不明显了。所以,一般用8,10,12,16的比较常见。我试过用30,发现和16的效果一样。不过,数值越大,占用的资源也会相对增大的。如果是在一些package、function or procedure中写的话,还是不要写那么大,免得占用太多资源被DBA开K。
4. Parallel也可以用于多表
多表的话,就是在第一后面,加入其他的就可以了。具体写法如下:
/*+parallel(t,10) (b,10)*/
5. 小结
关于执行效率,建议还是多按照index的方法来提高效果。Oracle有自带的explan road的方法,在执行之前,先看下执行计划路线,对写好的SQL tuned之后再执行。实在没办法了,再用parallel方法。Parallel比较邪恶,对开发者而言,不是好东西,会养成不好习惯,导致很多bad SQL不会暴漏,SQL Tuning的能力得不到提升。我有见过某些人create table后,从不create index或primary key,认为写SQL时加parallel就可以了。
Oracle JOB实现多线程插入
Sql代码
--经测试,大数据量的插入,多线程在普通磁盘执行效率反而更慢,不如单insert语句,而在磁盘阵列硬件环境下执行效率有很大的提升。
--创建表,模拟多线程插入(TT3-TT4)
DROP TABLE TT3;
DROP TABLE TT4;
CREATE TABLE TT4 AS SELECT * FROM DBA_OBJECTS WHERE 1=0;
CREATE TABLE TT3 AS SELECT * FROM DBA_OBJECTS;
--数据分批插入参数表
DROP TABLE JOB_PARMS;
CREATE TABLE JOB_PARMS
(
JOB NUMBER PRIMARY KEY,
LO_RID INT,
HI_RID INT
);
--创建插入的存储过程
CREATE OR REPLACE PROCEDURE PROC_TEST(P_JOB IN NUMBER) IS
L_REC JOB_PARMS%ROWTYPE;
BEGIN
SELECT * INTO L_REC
FROM JOB_PARMS
WHERE JOB = P_JOB;
INSERT INTO TT4
SELECT A.OWNER,
A.OBJECT_NAME,
A.SUBOBJECT_NAME,
A.OBJECT_ID,
A.DATA_OBJECT_ID,
A.OBJECT_TYPE,
A.CREATED,
A.LAST_DDL_TIME,
A.TIMESTAMP,
A.STATUS,
A.TEMPORARY,
A.GENERATED,
A.SECONDARY
FROM (SELECT ROWNUM RN, TT3.* FROM TT3 WHERE ROWNUM = L_REC.HI_RID) A
WHERE A.RN = L_REC.LO_RID;
DELETE FROM JOB_PARMS WHERE JOB = P_JOB;
COMMIT;
END;
/
---DIY 并行调度程序块
DECLARE
L_JOB NUMBER;
C_INDEX NUMBER;--插入的数量总数
S_INDEX INT:=0;--插入的开始index
E_INDEX INT:=0;--插入的结束index
CQ_INDEX INT:=20;--循环的次数
NUM_INCREASE INT:=0;--增量累加
V_I INT:=0;--计数器
BEGIN
SELECT COUNT(*) INTO C_INDEX FROM TT3;
NUM_INCREASE:= CEIL(C_INDEX/CQ_INDEX);
WHILE CQ_INDEX V_I
LOOP
V_I:=V_I+1;
S_INDEX:=1+NUM_INCREASE*(V_I-1);
IF(V_I = 20) THEN--当等于循环次数则修改结束的index
E_INDEX:= C_INDEX;
ELSE
E_INDEX:=NUM_INCREASE*V_I;
END IF;
DBMS_JOB.SUBMIT( L_JOB, 'PROC_TEST(JOB);');
INSERT INTO JOB_PARMS(JOB, LO_RID, HI_RID)
VALUES ( L_JOB, S_INDEX, E_INDEX );
END LOOP;
END;
/
问题如下:
ORACLE的多线程体现在DML上 在操作时, 如果见到/* +*/ (平时写备注、评论块的/**/符号中有加号, 那么则表明了使用Oracle Hint. /*+ parallel(表名,并发数)*/ (有时候写作Append parallel,或者有时候直接写Append) .
从开发的角度看:
ORACLE多线程可以提高某些语句查询的速度(不是一定的,取决于你的核,和服务器, 我原本有一些材料可以图示进程数和速度的关系,可惜一时找不到, 如果需要可以再联系)。具体使用时, 做几个测试 看看速率提高多少。。
从数据库整体来看:
多线程并不是优化了你的查询速率, 而是使用了更多数据库的资源(其他用户或者进程的资源)换来你的语句速率的提高。 联系一下你的DBA, 因为很有可能你用了多进程后,从DBA的EM上会发现你资源在某时间段内用的很高,甚至会给出警告。
void* OracleProcess(GPS_DATA GpsRec) // 数据库数据处理
{
interval = 0;
struct HashItem* pHash;
pHash = inithashtable(MAX_REC2);
char sql[384] = {0};
char temp[256] = {0};
char tName[10] = {0}; // 表名字
int i,k;
int j = TotalRec RATE;
double distance;
for(i=0; i j; i++)
{
sprintf(temp,"%s%f%f%f%d",gps_last[i].tid,gps_last[i].lon,gps_last[i].lat,gps_last[i].speed,gps_last[i].udate);
InsertHash(temp, pHash, MAX_REC2); // 插入最后GPS信息到hash
memset(temp,0x00,256);
}
for(i = 0; i TotalRec; i++)
{
for(k=0; kj; k++) // 查询车机是否在册
if(strcmp(GpsRec[i].tid,tid[k]) == 0)
break;
if(k j)
{
if(GpsRec[i].udate != 0.00)
{
distance = InfoUpdate(GpsRec,i); // 最新GPS数据更新
sprintf(temp,"%s%f%f%f%d",GpsRec[i].tid,GpsRec[i].lon,GpsRec[i].lat,GpsRec[i].speed,GpsRec[i].udate);
if(GetHashTablePos(temp, pHash, MAX_REC2) == -1) // 查找hash是否存在
{
if (distance 0.0001)
{
sprintf(tName,"GPS_%d_Y",tf[k]);
InsertHash(temp, pHash, MAX_REC2); // 插入
sprintf(sql,"insert into %s (id,tm_id,lon,lat, speed, utc_time, udate,mileage,DIRECTION,DISTANCE) values (seq_gps.nextVal,'%s','%f','%f','%f','%d','%d','%f','%d','%f','%d')",
tName,GpsRec[i].tid,GpsRec[i].lon,GpsRec[i].lat,GpsRec[i].speed,GpsRec[i].utime,GpsRec[i].udate,GpsRec[i].mileage,GpsRec[i].dir,distance,interval);
printf("%s\n",sql);
oci_excu(oracle_env,(text *)sql,0); // 插入数据
memset(tName,0x00,10);
}
}
memset(sql,0x00,384);
memset(temp,0x00,256);
}
}
}
memset(GpsRec,0x00,sizeof(GpsRec));
free(pHash);
pthread_exit(NULL);
}
void TcpProcess(int tfd) // 处理TCP连接上的事务
{
struct timeval ntime;
int index = 0,times,ret;
int rlen = 0,rflag = 0;
char recvbuf[513] = {0};
bzero(recvbuf,513);
while(1)
{
ret = rlen = read(tfd,recvbuf,512);
if(rlen = 0)
break;
if((rlen%32) == 0) // 32长度为标准TCP信息
{
times = 0;
ret = 5;
while(ret--)
{
if(tflag[tfd] == tfd) // 已经存在的socket
{
LOVENIX *info = (LOVENIX *)malloc(sizeof(LOVENIX));
memset(info,0x00,sizeof(LOVENIX));
if(recvbuf[times] == 0x58 || recvbuf[times] == 0x59)
ProtocolAnalysisLovenixTcp(recvbuf[times],info);
else if(recvbuf[times] == 0x24)
ProtocolAnalysisLovenixUdp(recvbuf[times],info);
sprintf(info-tid,"%s",seq[tfd]); // 合成车辆ID
DataProcess(info); // 处理GPS数据
free(info);
gettimeofday(ntime, NULL);
cntime[tfd] = ntime.tv_sec; // 更新时间
times += 32;
}
}
}
else if(rlen 32)
{
if(!rflag)
{
if((index = RegLovenix(tfd,recvbuf)) -1)
{
sprintf(seq[tfd],"%s",tid[index]); // 将对应的socket设备ID保存
gettimeofday(ntime, NULL);
sfd[tfd] = tfd;
cntime[tfd] = ntime.tv_sec;
tflag[tfd] = tfd;
rflag = 1;
}
}
}
if(rlen 512); // 已经读完
break;
memset(recvbuf,0x00,rlen);
}
}
void *TcpServer(void *arg)
{
int port = (unsigned int) arg;
int efd,i;
struct timeval ntime;
int listener, nfds, n, listen_opt = 1, lisnum;
struct sockaddr_in my_addr, their_addr;
socklen_t len = sizeof(their_addr);
lisnum = MAXLISTEN;
for(i=0; iMAX_REC; i++)
{
sfd[i] = 0;
tflag[i] = 0;
}
if ((listener = socket(PF_INET, SOCK_STREAM, 0)) == -1) // 开启 socket 监听
{
lprintf(lfd, FATAL, "TCP Socket error!\n");
exit(1);
}
else
lprintf(lfd, INFO, "TCP socket creat susscess!\n");
setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, (void *) listen_opt,(int) sizeof(listen_opt)); // 设置端口多重邦定
setnonblocking(listener);
bzero(my_addr, sizeof(my_addr));
my_addr.sin_family = PF_INET;
my_addr.sin_port = htons(port);
my_addr.sin_addr.s_addr = INADDR_ANY;
if (bind(listener, (struct sockaddr *) my_addr, sizeof(struct sockaddr)) == -1)
{
lprintf(lfd, FATAL, "TCP bind error!\n");
exit(1);
}
else
lprintf(lfd, INFO, "TCP bind susscess!\n");
if (listen(listener, lisnum) == -1)
{
lprintf(lfd, FATAL, "TCP listen error!\n");
exit(1);
}
else
lprintf(lfd, INFO, "TCP listen susscess!\n");
kdpfd = epoll_create(MAXEPOLLSIZE); // 创建 epoll句柄,把监听socket加入到epoll集合里
ev.events = EPOLLIN | EPOLLET; // 注册epoll 事件
ev.data.fd = listener;
if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, listener, ev) 0)
lprintf(lfd, FATAL, "EPOLL_CTL_ADD error!\n");
while (1)
{
sem_wait(sem_tcp); // 等待 sem_TCP
sem_wait(sem_tp); // 将tp值减一
nfds = epoll_wait(kdpfd, events, MAXEPOLLSIZE, 1); // 等待有事件发生
if (nfds == -1)
lprintf(lfd, FATAL,"EPOLL_WAIT error!\n");
for (n = 0; n nfds; ++n) // 处理epoll所有事件
{
if (events[n].data.fd == listener) // 如果是连接事件
{
if ((efd = accept(listener, (struct sockaddr *) their_addr,len)) 0)
{
lprintf(lfd, FATAL, "accept error!\n");
continue;
}
else
lprintf(lfd, INFO, "Client from :%s\tSocket ID:%d\n", inet_ntoa(their_addr.sin_addr) ,efd);
setnonblocking(efd); // 设置新连接为非阻塞模式
ev.events = EPOLLIN | EPOLLET; // 注册新连接
ev.data.fd = efd;
if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, efd, ev) 0) // 将新连接加入EPOLL的监听队列
lprintf(lfd, FATAL, "EPOLL_CTL_ADD error!\n");
else
{
gettimeofday(ntime, NULL);
cntime[efd] = ntime.tv_sec;
sfd[efd] = efd;
}
}
else if (events[n].events EPOLLIN)
tpool_add_work(pool, TcpProcess, (void*)events[n].data.fd); // 读取分析TCP信息
else
{
close(events[n].data.fd);
epoll_ctl(kdpfd, EPOLL_CTL_DEL, events[n].data.fd, ev);
}
}
sem_post(sem_cm);
sem_post(sem_udp);
}
close(listener);
}
int DataProcess(LOVENIX *info) // 处理GPS数据
{
if(sflag == 0 (CacheRec != TotalRec)) // 缓存1可用且没有满
{
gps_cache[CacheRec].lat = info-lat;
gps_cache[CacheRec].mileage = info-mileage;
gps_cache[CacheRec].lon = info-lon;
gps_cache[CacheRec].speed = atod(info-speed, strlen(info-speed))*0.514444444*3.6;
gps_cache[CacheRec].udate = atoi(info-udate);
gps_cache[CacheRec].utime = atoi(info-utime);
gps_cache[CacheRec].dir = atoi(info-dir);
sprintf(gps_cache[CacheRec].tid ,"%s",info-tid);
CacheRec++;
// printf("CacheRec %d\tTotalRec %d \t sflag:%d\n",CacheRec,TotalRec,sflag);
if(CacheRec == TotalRec)
{
sflag = 1;
pthread_attr_init(attr); // 初始化属性值,均设为默认值
pthread_attr_setscope(attr, PTHREAD_SCOPE_SYSTEM);
pthread_attr_setdetachstate(attr, PTHREAD_CREATE_DETACHED); // 设置线程为分离属性
if (pthread_create(thread, attr,(void*) OracleProcess,(void*)gps_cache)) // 创建数据处理线程
lprintf(lfd, FATAL, "oracle pthread_creat error!\n");
CacheRec = 0;
}
}
else if(sflag == 1 (Cache1Rec != TotalRec)) // 缓存2可用且没有满
{
gps_cache1[Cache1Rec].mileage = info-mileage;
gps_cache1[Cache1Rec].lat = info-lat;
gps_cache1[Cache1Rec].lon = info-lon;
gps_cache1[Cache1Rec].speed = atod(info-speed, strlen(info-speed))*0.514444444*3.6;
gps_cache1[Cache1Rec].udate = atoi(info-udate);
gps_cache1[Cache1Rec].utime = atoi(info-utime);
gps_cache1[Cache1Rec].dir = atoi(info-dir);
sprintf(gps_cache1[Cache1Rec].tid ,"%s",info-tid);
Cache1Rec++;
if(Cache1Rec == TotalRec)
{
sflag = 0;
pthread_attr_init(attr); // 初始化属性值,均设为默认值
pthread_attr_setscope(attr, PTHREAD_SCOPE_SYSTEM);
pthread_attr_setdetachstate(attr, PTHREAD_CREATE_DETACHED); // 设置线程为分离属性
if (pthread_create(thread, attr,(void*) OracleProcess,(void*)gps_cache1)) // 创建数据处理线程
lprintf(lfd, FATAL, "oracle pthread_creat error!\n");
Cache1Rec = 0;
}
}
else
{
lprintf(lfd, FATAL, "No cache to use!\n");
return (0);
}
return (1);
}
windows里所有oracle事务统一由一个oracle.exe进程管理,pmon、smon等表现为oracle.exe内的线程。
linux系统里pmon、smon都是独立的进程。
线程是进程的组成部分,进程中的资源由多个线程共享。你可以把它们想象成:
进程是老大,手上有的是钞票、棍棒;
线程是他的小弟们,身无分文;
老大提供棍棒给小弟们出去办事,办事需要钱的时候由老大分配,小弟们抢来的钞票归老大统一支配。