亚洲全黄无码一级在线看_国产剧情久久久性色_无码av一区二区三区无码_亚洲成a×人片在线观看

當(dāng)前位置: 首頁 > 科技新聞 >

阿里面試:如何用Redis實(shí)現(xiàn)分布式鎖?

時間:2020-06-04 17:39來源:網(wǎng)絡(luò)整理 瀏覽:
前言上一章節(jié)我提到了基于zk分布式鎖的實(shí)現(xiàn),這章節(jié)就來說一下基于Redis的分布式鎖實(shí)現(xiàn)吧。zk實(shí)現(xiàn)分布式鎖的傳送門:zk分布式鎖在開始提到
前言

上一章節(jié)我提到了基于zk分布式鎖的實(shí)現(xiàn),這章節(jié)就來說一下基于Redis的分布式鎖實(shí)現(xiàn)吧。

zk實(shí)現(xiàn)分布式鎖的傳送門:zk分布式鎖

在開始提到Redis分布式鎖之前,我想跟大家聊點(diǎn)Redis的基礎(chǔ)知識。

說一下Redis的兩個命令:

SETNX key value

setnx 是SET if Not eXists(如果不存在,則 SET)的簡寫。

阿里面試:如何用Redis實(shí)現(xiàn)分布式鎖?

用法如圖,如果不存在set成功返回int的1,這個key存在了返回0。

SETEX key seconds value

將值 value 關(guān)聯(lián)到 key ,并將 key 的生存時間設(shè)為 seconds (以秒為單位)。

如果 key 已經(jīng)存在,setex命令將覆寫舊值。

有小伙伴肯定會疑惑萬一set value 成功 set time失敗,那不就傻了么,這啊Redis官網(wǎng)想到了。

setex是一個原子性(atomic)操作,關(guān)聯(lián)值和設(shè)置生存時間兩個動作會在同一時間內(nèi)完成。

阿里面試:如何用Redis實(shí)現(xiàn)分布式鎖?

我設(shè)置了10秒的失效時間,ttl命令可以查看倒計(jì)時,負(fù)的說明已經(jīng)到期了。

跟大家講這兩個命名也是有原因的,因?yàn)樗麄兪荝edis實(shí)現(xiàn)分布式鎖的關(guān)鍵。

正文

開始前還是看看場景:

阿里面試:如何用Redis實(shí)現(xiàn)分布式鎖?

我依然是創(chuàng)建了很多個線程去扣減庫存inventory,不出意外的庫存扣減順序變了,最終的結(jié)果也是不對的。

單機(jī)加synchronized或者Lock這些常規(guī)操作我就不說了好吧,結(jié)果肯定是對的。

阿里面試:如何用Redis實(shí)現(xiàn)分布式鎖?

我先實(shí)現(xiàn)一個簡單的Redis鎖,然后我們再實(shí)現(xiàn)分布式鎖,可能更方便大家的理解。

還記得上面我說過的命令么,實(shí)現(xiàn)一個單機(jī)的其實(shí)比較簡單,你們先思考一下,別往下看。

setnx阿里面試:如何用Redis實(shí)現(xiàn)分布式鎖?

可以看到,第一個成功了,沒釋放鎖,后面的都失敗了,至少順序問題問題是解決了,只要加鎖,縮放后面的拿到,釋放如此循環(huán),就能保證按照順序執(zhí)行。

但是你們也發(fā)現(xiàn)問題了,還是一樣的,第一個仔set成功了,但是突然掛了,那鎖就一直在那無法得到釋放,后面的線程也永遠(yuǎn)得不到鎖,又死鎖了。

所以....

setex

知道我之前說這個命令的原因了吧,設(shè)置一個過期時間,就算線程1掛了,也會在失效時間到了,自動釋放。

我這里就用到了nx和px的結(jié)合參數(shù),就是set值并且加了過期時間,這里我還設(shè)置了一個過期時間,就是這時間內(nèi)如果第二個沒拿到第一個的鎖,就退出阻塞了,因?yàn)榭赡苁强蛻舳藬噙B了。

阿里面試:如何用Redis實(shí)現(xiàn)分布式鎖?

加鎖

整體加鎖的邏輯比較簡單,大家基本上都能看懂,不過我拿到當(dāng)前時間去減開始時間的操作感覺有點(diǎn)笨, System.currentTimeMillis()消耗很大的。

/**
*加鎖
*
*@paramid
*@return
*/
publicbooleanlock(Stringid){
Longstart=System.currentTimeMillis();
try{
for(;;){
//SET命令返回OK,則證明獲取鎖成功
Stringlock=jedis.set(LOCK_KEY,id,params);
if("OK".equals(lock)){
returntrue;
}
//否則循環(huán)等待,在timeout時間內(nèi)仍未獲取到鎖,則獲取失敗
longl=System.currentTimeMillis()-start;
if(l>=timeout){
returnfalse;
}
try{
Thread.sleep(100);
}catch(InterruptedExceptione){
e.printStackTrace();
}
}
}finally{
jedis.close();
}
}

System.currentTimeMillis消耗大,每個線程進(jìn)來都這樣,我之前寫代碼,就會在服務(wù)器啟動的時候,開一個線程不斷去拿,調(diào)用方直接獲取值就好了,不過也不是最優(yōu)解,日期類還是有很多好方法的。

@Service
publicclassTimeServcie{
privatestaticlongtime;
static{
newThread(newRunnable(){
@Override
publicvoidrun(){
while(true){
try{
Thread.sleep(5);
}catch(InterruptedExceptione){
e.printStackTrace();
}
longcur=System.currentTimeMillis();
setTime(cur);
}
}
}).start();
}

publicstaticlonggetTime(){
returntime;
}

publicstaticvoidsetTime(longtime){
TimeServcie.time=time;
}
}
解鎖

解鎖的邏輯更加簡單,就是一段Lua的拼裝,把Key做了刪除。

你們發(fā)現(xiàn)沒,我上面加鎖解鎖都用了UUID,這就是為了保證,誰加鎖了誰解鎖,要是你刪掉了我的鎖,那不亂套了嘛。

LUA是原子性的,也比較簡單,就是判斷一下Key和我們參數(shù)是否相等,是的話就刪除,返回成功1,0就是失敗。

/**
*解鎖
*
*@paramid
*@return
*/
publicbooleanunlock(Stringid){
Stringscript=
"ifredis.call('get',KEYS[1])==ARGV[1]then"+
"returnredis.call('del',KEYS[1])"+
"else"+
"return0"+
"end";
try{
Stringresult=jedis.eval(script,Collections.singletonList(LOCK_KEY),Collections.singletonList(id)).toString();
return"1".equals(result)?true:false;
}finally{
jedis.close();
}
}
驗(yàn)證

我們可以用我們寫的Redis鎖試試效果,可以看到都按照順序去執(zhí)行了

阿里面試:如何用Redis實(shí)現(xiàn)分布式鎖?

思考

大家是不是覺得完美了,但是上面的鎖,有不少瑕疵的,我沒思考很多點(diǎn),你或許可以思考一下,源碼我都開源到我的GItHub了。

而且,鎖一般都是需要可重入行的,上面的線程都是執(zhí)行完了就釋放了,無法再次進(jìn)入了,進(jìn)去也是重新加鎖了,對于一個鎖的設(shè)計(jì)來說肯定不是很合理的。

我不打算手寫,因?yàn)槎加鞋F(xiàn)成的,別人幫我們寫好了。

redisson

redisson的鎖,就實(shí)現(xiàn)了可重入了,但是他的源碼比較晦澀難懂。

使用起來很簡單,因?yàn)樗麄兊讓佣挤庋b好了,你連接上你的Redis客戶端,他幫你做了我上面寫的一切,然后更完美。

簡單看看他的使用吧,跟正常使用Lock沒啥區(qū)別。

ThreadPoolExecutorthreadPoolExecutor=
newThreadPoolExecutor(inventory,inventory,10L,SECONDS,linkedBlockingQueue);
longstart=System.currentTimeMillis();
Configconfig=newConfig();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
finalRedissonClientclient=Redisson.create(config);
finalRLocklock=client.getLock("lock1");

for(inti=0;i<=NUM;i++){
threadPoolExecutor.execute(newRunnable(){
publicvoidrun(){
lock.lock();
inventory--;
System.out.println(inventory);
lock.unlock();
}
});
}
longend=System.currentTimeMillis();
System.out.println("執(zhí)行線程數(shù):"+NUM+"總耗時:"+(end-start)+"庫存數(shù)為:"+inventory);

上面可以看到我用到了getLock,其實(shí)就是獲取一個鎖的實(shí)例。

RedissionLock也沒做啥,就是熟悉的初始化。

publicRLockgetLock(Stringname){
returnnewRedissonLock(connectionManager.getCommandExecutor(),name);
}

publicRedissonLock(CommandAsyncExecutorcommandExecutor,Stringname){
super(commandExecutor,name);
//命令執(zhí)行器
this.commandExecutor=commandExecutor;
//UUID字符串
this.id=commandExecutor.getConnectionManager().getId();
//內(nèi)部鎖過期時間
this.internalLockLeaseTime=commandExecutor.
getConnectionManager().getCfg().getLockWatchdogTimeout();
this.entryName=id+":"+name;
}
加鎖

有沒有發(fā)現(xiàn)很多跟Lock很多相似的地方呢?

嘗試加鎖,拿到當(dāng)前線程,然后我開頭說的ttl也看到了,是不是一切都是那么熟悉?

publicvoidlockInterruptibly(longleaseTime,TimeUnitunit)throwsInterruptedException{

//當(dāng)前線程ID
longthreadId=Thread.currentThread().getId();
//嘗試獲取鎖
Longttl=tryAcquire(leaseTime,unit,threadId);
//如果ttl為空,則證明獲取鎖成功
if(ttl==null){
return;
}
//如果獲取鎖失敗,則訂閱到對應(yīng)這個鎖的channel
RFuture<RedissonLockEntry>future=subscribe(threadId);
commandExecutor.syncSubscription(future);

try{
while(true){
//再次嘗試獲取鎖
ttl=tryAcquire(leaseTime,unit,threadId);
//ttl為空,說明成功獲取鎖,返回
if(ttl==null){
break;
}
//ttl大于0則等待ttl時間后繼續(xù)嘗試獲取
if(ttl>=0){
getEntry(threadId).getLatch().tryAcquire(ttl,TimeUnit.MILLISECONDS);
}else{
getEntry(threadId).getLatch().acquire();
}
}
}finally{
//取消對channel的訂閱
unsubscribe(future,threadId);
}
//get(lockAsync(leaseTime,unit));
}
獲取鎖

獲取鎖的時候,也比較簡單,你可以看到,他也是不斷刷新過期時間,跟我上面不斷去拿當(dāng)前時間,校驗(yàn)過期是一個道理,只是我比較粗糙。

private<T>RFuture<Long>tryAcquireAsync(longleaseTime,TimeUnitunit,finallongthreadId){

//如果帶有過期時間,則按照普通方式獲取鎖
if(leaseTime!=-1){
returntryLockInnerAsync(leaseTime,unit,threadId,RedisCommands.EVAL_LONG);
}

//先按照30秒的過期時間來執(zhí)行獲取鎖的方法
RFuture<Long>ttlRemainingFuture=tryLockInnerAsync(
commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
TimeUnit.MILLISECONDS,threadId,RedisCommands.EVAL_LONG);

//如果還持有這個鎖,則開啟定時任務(wù)不斷刷新該鎖的過期時間
ttlRemainingFuture.addListener(newFutureListener<Long>(){
@Override
publicvoidoperationComplete(Future<Long>future)throwsException{
if(!future.isSuccess()){
return;
}

LongttlRemaining=future.getNow();
//lockacquired
if(ttlRemaining==null){
scheduleExpirationRenewal(threadId);
}
}
});
returnttlRemainingFuture;
}
底層加鎖邏輯

你可能會想這么多操作,在一起不是原子性不還是有問題么?

大佬們肯定想得到呀,所以還是LUA,他使用了Hash的數(shù)據(jù)結(jié)構(gòu)。

主要是判斷鎖是否存在,存在就設(shè)置過期時間,如果鎖已經(jīng)存在了,那對比一下線程,線程是一個那就證明可以重入,鎖在了,但是不是當(dāng)前線程,證明別人還沒釋放,那就把剩余時間返回,加鎖失敗。

是不是有點(diǎn)繞,多理解一遍。

<T>RFuture<T>tryLockInnerAsync(longleaseTime,TimeUnitunit,
longthreadId,RedisStrictCommand<T>command){

//過期時間
internalLockLeaseTime=unit.toMillis(leaseTime);

returncommandExecutor.evalWriteAsync(getName(),LongCodec.INSTANCE,command,
//如果鎖不存在,則通過hset設(shè)置它的值,并設(shè)置過期時間
"if(redis.call('exists',KEYS[1])==0)then"+
"redis.call('hset',KEYS[1],ARGV[2],1);"+
"redis.call('pexpire',KEYS[1],ARGV[1]);"+
"returnnil;"+
"end;"+
//如果鎖已存在,并且鎖的是當(dāng)前線程,則通過hincrby給數(shù)值遞增1
"if(redis.call('hexists',KEYS[1],ARGV[2])==1)then"+
"redis.call('hincrby',KEYS[1],ARGV[2],1);"+
"redis.call('pexpire',KEYS[1],ARGV[1]);"+
"returnnil;"+
"end;"+
//如果鎖已存在,但并非本線程,則返回過期時間ttl
"returnredis.call('pttl',KEYS[1]);",
Collections.<Object>singletonList(getName()),
internalLockLeaseTime,getLockName(threadId));
}
解鎖

鎖的釋放主要是publish釋放鎖的信息,然后做校驗(yàn),一樣會判斷是否當(dāng)前線程,成功就釋放鎖,還有個hincrby遞減的操作,鎖的值大于0說明是可重入鎖,那就刷新過期時間。

如果值小于0了,那刪掉Key釋放鎖。

是不是又和AQS很像了?

AQS就是通過一個volatile修飾status去看鎖的狀態(tài),也會看數(shù)值判斷是否是可重入的。

所以我說代碼的設(shè)計(jì),最后就萬劍歸一,都是一樣的。

publicRFuture<Void>unlockAsync(finallongthreadId){
finalRPromise<Void>result=newRedissonPromise<Void>();

//解鎖方法
RFuture<Boolean>future=unlockInnerAsync(threadId);

future.addListener(newFutureListener<Boolean>(){
@Override
publicvoidoperationComplete(Future<Boolean>future)throwsException{
if(!future.isSuccess()){
cancelExpirationRenewal(threadId);
result.tryFailure(future.cause());
return;
}
//獲取返回值
BooleanopStatus=future.getNow();
//如果返回空,則證明解鎖的線程和當(dāng)前鎖不是同一個線程,拋出異常
if(opStatus==null){
IllegalMonitorStateExceptioncause=
newIllegalMonitorStateException("
attempttounlocklock,notlockedbycurrentthreadbynodeid:"
+id+"thread-id:"+threadId);
result.tryFailure(cause);
return;
}
//解鎖成功,取消刷新過期時間的那個定時任務(wù)
if(opStatus){
cancelExpirationRenewal(null);
}
result.trySuccess(null);
}
});

returnresult;
}


protectedRFuture<Boolean>unlockInnerAsync(longthreadId){
returncommandExecutor.evalWriteAsync(getName(),LongCodec.INSTANCE,EVAL,

//如果鎖已經(jīng)不存在,發(fā)布鎖釋放的消息
"if(redis.call('exists',KEYS[1])==0)then"+
"redis.call('publish',KEYS[2],ARGV[1]);"+
"return1;"+
"end;"+
//如果釋放鎖的線程和已存在鎖的線程不是同一個線程,返回null
"if(redis.call('hexists',KEYS[1],ARGV[3])==0)then"+
"returnnil;"+
"end;"+
//通過hincrby遞減1的方式,釋放一次鎖
//若剩余次數(shù)大于0,則刷新過期時間
"localcounter=redis.call('hincrby',KEYS[1],ARGV[3],-1);"+
"if(counter>0)then"+
"redis.call('pexpire',KEYS[1],ARGV[2]);"+
"return0;"+
//否則證明鎖已經(jīng)釋放,刪除key并發(fā)布鎖釋放的消息
"else"+
"redis.call('del',KEYS[1]);"+
"redis.call('publish',KEYS[2],ARGV[1]);"+
"return1;"+
"end;"+
"returnnil;",
Arrays.<Object>asList(getName(),getChannelName()),
LockPubSub.unlockMessage,internalLockLeaseTime,getLockName(threadId));

}
總結(jié)

這個寫了比較久,但是不是因?yàn)閺?fù)雜什么的,是因?yàn)閭€人工作的原因,最近事情很多嘛,還是那句話,程序員才是我的本職寫文章只是個愛好,不能本末倒置了。

大家會發(fā)現(xiàn),你學(xué)懂一個技術(shù)棧之后,學(xué)新的會很快,而且也能發(fā)現(xiàn)他們的設(shè)計(jì)思想和技巧真的很巧妙,也總能找到相似點(diǎn),和讓你驚嘆的點(diǎn)。

就拿Doug Lea寫的AbstractQueuedSynchronizer(AQS)來說,他寫了一行代碼,你可能看幾天才能看懂,大佬們的思想是真的牛。

我看源碼有時候也頭疼,但是去谷歌一下,自己理解一下,突然恍然大悟的時候覺得一切又很值。

學(xué)習(xí)就是一條時而郁郁寡歡,時而開環(huán)大笑的路,大家加油,我們成長路上一起共勉。

我是敖丙,一個在互聯(lián)網(wǎng)茍且偷生的工具人。

最好的關(guān)系是互相成就,大家的**「三連」**就是丙丙創(chuàng)作的最大動力,我們下期見!

注:如果本篇博客有任何錯誤和建議,歡迎人才們留言,你快說句話啊

你知道的越多,你不知道的越多

推薦內(nèi)容