上一章節(jié)我提到了基于zk分布式鎖的實(shí)現(xiàn),這章節(jié)就來說一下基于Redis的分布式鎖實(shí)現(xiàn)吧。
zk實(shí)現(xiàn)分布式鎖的傳送門:zk分布式鎖在開始提到Redis分布式鎖之前,我想跟大家聊點(diǎn)Redis的基礎(chǔ)知識。
說一下Redis的兩個命令:
SETNX key value
setnx 是SET if Not eXists(如果不存在,則 SET)的簡寫。
用法如圖,如果不存在set成功返回int的1,這個key存在了返回0。
SETEX key seconds value
將值 value 關(guān)聯(lián)到 key ,并將 key 的生存時間設(shè)為 seconds (以秒為單位)。
如果 key 已經(jīng)存在,setex命令將覆寫舊值。
有小伙伴肯定會疑惑萬一set value 成功 set time失敗,那不就傻了么,這啊Redis官網(wǎng)想到了。
setex是一個原子性(atomic)操作,關(guān)聯(lián)值和設(shè)置生存時間兩個動作會在同一時間內(nèi)完成。
我設(shè)置了10秒的失效時間,ttl命令可以查看倒計(jì)時,負(fù)的說明已經(jīng)到期了。
跟大家講這兩個命名也是有原因的,因?yàn)樗麄兪荝edis實(shí)現(xiàn)分布式鎖的關(guān)鍵。
正文開始前還是看看場景:
我依然是創(chuàng)建了很多個線程去扣減庫存inventory,不出意外的庫存扣減順序變了,最終的結(jié)果也是不對的。
單機(jī)加synchronized或者Lock這些常規(guī)操作我就不說了好吧,結(jié)果肯定是對的。
我先實(shí)現(xiàn)一個簡單的Redis鎖,然后我們再實(shí)現(xiàn)分布式鎖,可能更方便大家的理解。
還記得上面我說過的命令么,實(shí)現(xiàn)一個單機(jī)的其實(shí)比較簡單,你們先思考一下,別往下看。
setnx可以看到,第一個成功了,沒釋放鎖,后面的都失敗了,至少順序問題問題是解決了,只要加鎖,縮放后面的拿到,釋放如此循環(huán),就能保證按照順序執(zhí)行。
但是你們也發(fā)現(xiàn)問題了,還是一樣的,第一個仔set成功了,但是突然掛了,那鎖就一直在那無法得到釋放,后面的線程也永遠(yuǎn)得不到鎖,又死鎖了。
所以....
setex知道我之前說這個命令的原因了吧,設(shè)置一個過期時間,就算線程1掛了,也會在失效時間到了,自動釋放。
我這里就用到了nx和px的結(jié)合參數(shù),就是set值并且加了過期時間,這里我還設(shè)置了一個過期時間,就是這時間內(nèi)如果第二個沒拿到第一個的鎖,就退出阻塞了,因?yàn)榭赡苁强蛻舳藬噙B了。
加鎖整體加鎖的邏輯比較簡單,大家基本上都能看懂,不過我拿到當(dāng)前時間去減開始時間的操作感覺有點(diǎn)笨, System.currentTimeMillis()消耗很大的。
/**
*加鎖
*
*@paramid
*@return
*/
publicbooleanlock(Stringid){
Longstart=System.currentTimeMillis();
try{
for(;;){
//SET命令返回OK,則證明獲取鎖成功
Stringlock=jedis.set(LOCK_KEY,id,params);
if("OK".equals(lock)){
returntrue;
}
//否則循環(huán)等待,在timeout時間內(nèi)仍未獲取到鎖,則獲取失敗
longl=System.currentTimeMillis()-start;
if(l>=timeout){
returnfalse;
}
try{
Thread.sleep(100);
}catch(InterruptedExceptione){
e.printStackTrace();
}
}
}finally{
jedis.close();
}
}
System.currentTimeMillis消耗大,每個線程進(jìn)來都這樣,我之前寫代碼,就會在服務(wù)器啟動的時候,開一個線程不斷去拿,調(diào)用方直接獲取值就好了,不過也不是最優(yōu)解,日期類還是有很多好方法的。
@Service
publicclassTimeServcie{
privatestaticlongtime;
static{
newThread(newRunnable(){
@Override
publicvoidrun(){
while(true){
try{
Thread.sleep(5);
}catch(InterruptedExceptione){
e.printStackTrace();
}
longcur=System.currentTimeMillis();
setTime(cur);
}
}
}).start();
}
publicstaticlonggetTime(){
returntime;
}
publicstaticvoidsetTime(longtime){
TimeServcie.time=time;
}
}
解鎖解鎖的邏輯更加簡單,就是一段Lua的拼裝,把Key做了刪除。
你們發(fā)現(xiàn)沒,我上面加鎖解鎖都用了UUID,這就是為了保證,誰加鎖了誰解鎖,要是你刪掉了我的鎖,那不亂套了嘛。
LUA是原子性的,也比較簡單,就是判斷一下Key和我們參數(shù)是否相等,是的話就刪除,返回成功1,0就是失敗。
/**
*解鎖
*
*@paramid
*@return
*/
publicbooleanunlock(Stringid){
Stringscript=
"ifredis.call('get',KEYS[1])==ARGV[1]then"+
"returnredis.call('del',KEYS[1])"+
"else"+
"return0"+
"end";
try{
Stringresult=jedis.eval(script,Collections.singletonList(LOCK_KEY),Collections.singletonList(id)).toString();
return"1".equals(result)?true:false;
}finally{
jedis.close();
}
}
驗(yàn)證我們可以用我們寫的Redis鎖試試效果,可以看到都按照順序去執(zhí)行了
思考大家是不是覺得完美了,但是上面的鎖,有不少瑕疵的,我沒思考很多點(diǎn),你或許可以思考一下,源碼我都開源到我的GItHub了。
而且,鎖一般都是需要可重入行的,上面的線程都是執(zhí)行完了就釋放了,無法再次進(jìn)入了,進(jìn)去也是重新加鎖了,對于一個鎖的設(shè)計(jì)來說肯定不是很合理的。
我不打算手寫,因?yàn)槎加鞋F(xiàn)成的,別人幫我們寫好了。
redissonredisson的鎖,就實(shí)現(xiàn)了可重入了,但是他的源碼比較晦澀難懂。
使用起來很簡單,因?yàn)樗麄兊讓佣挤庋b好了,你連接上你的Redis客戶端,他幫你做了我上面寫的一切,然后更完美。
簡單看看他的使用吧,跟正常使用Lock沒啥區(qū)別。
ThreadPoolExecutorthreadPoolExecutor=
newThreadPoolExecutor(inventory,inventory,10L,SECONDS,linkedBlockingQueue);
longstart=System.currentTimeMillis();
Configconfig=newConfig();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
finalRedissonClientclient=Redisson.create(config);
finalRLocklock=client.getLock("lock1");
for(inti=0;i<=NUM;i++){
threadPoolExecutor.execute(newRunnable(){
publicvoidrun(){
lock.lock();
inventory--;
System.out.println(inventory);
lock.unlock();
}
});
}
longend=System.currentTimeMillis();
System.out.println("執(zhí)行線程數(shù):"+NUM+"總耗時:"+(end-start)+"庫存數(shù)為:"+inventory);
上面可以看到我用到了getLock,其實(shí)就是獲取一個鎖的實(shí)例。
RedissionLock也沒做啥,就是熟悉的初始化。
publicRLockgetLock(Stringname){
returnnewRedissonLock(connectionManager.getCommandExecutor(),name);
}
publicRedissonLock(CommandAsyncExecutorcommandExecutor,Stringname){
super(commandExecutor,name);
//命令執(zhí)行器
this.commandExecutor=commandExecutor;
//UUID字符串
this.id=commandExecutor.getConnectionManager().getId();
//內(nèi)部鎖過期時間
this.internalLockLeaseTime=commandExecutor.
getConnectionManager().getCfg().getLockWatchdogTimeout();
this.entryName=id+":"+name;
}
加鎖有沒有發(fā)現(xiàn)很多跟Lock很多相似的地方呢?
嘗試加鎖,拿到當(dāng)前線程,然后我開頭說的ttl也看到了,是不是一切都是那么熟悉?
publicvoidlockInterruptibly(longleaseTime,TimeUnitunit)throwsInterruptedException{
//當(dāng)前線程ID
longthreadId=Thread.currentThread().getId();
//嘗試獲取鎖
Longttl=tryAcquire(leaseTime,unit,threadId);
//如果ttl為空,則證明獲取鎖成功
if(ttl==null){
return;
}
//如果獲取鎖失敗,則訂閱到對應(yīng)這個鎖的channel
RFuture<RedissonLockEntry>future=subscribe(threadId);
commandExecutor.syncSubscription(future);
try{
while(true){
//再次嘗試獲取鎖
ttl=tryAcquire(leaseTime,unit,threadId);
//ttl為空,說明成功獲取鎖,返回
if(ttl==null){
break;
}
//ttl大于0則等待ttl時間后繼續(xù)嘗試獲取
if(ttl>=0){
getEntry(threadId).getLatch().tryAcquire(ttl,TimeUnit.MILLISECONDS);
}else{
getEntry(threadId).getLatch().acquire();
}
}
}finally{
//取消對channel的訂閱
unsubscribe(future,threadId);
}
//get(lockAsync(leaseTime,unit));
}
獲取鎖獲取鎖的時候,也比較簡單,你可以看到,他也是不斷刷新過期時間,跟我上面不斷去拿當(dāng)前時間,校驗(yàn)過期是一個道理,只是我比較粗糙。
private<T>RFuture<Long>tryAcquireAsync(longleaseTime,TimeUnitunit,finallongthreadId){
//如果帶有過期時間,則按照普通方式獲取鎖
if(leaseTime!=-1){
returntryLockInnerAsync(leaseTime,unit,threadId,RedisCommands.EVAL_LONG);
}
//先按照30秒的過期時間來執(zhí)行獲取鎖的方法
RFuture<Long>ttlRemainingFuture=tryLockInnerAsync(
commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
TimeUnit.MILLISECONDS,threadId,RedisCommands.EVAL_LONG);
//如果還持有這個鎖,則開啟定時任務(wù)不斷刷新該鎖的過期時間
ttlRemainingFuture.addListener(newFutureListener<Long>(){
@Override
publicvoidoperationComplete(Future<Long>future)throwsException{
if(!future.isSuccess()){
return;
}
LongttlRemaining=future.getNow();
//lockacquired
if(ttlRemaining==null){
scheduleExpirationRenewal(threadId);
}
}
});
returnttlRemainingFuture;
}
底層加鎖邏輯你可能會想這么多操作,在一起不是原子性不還是有問題么?
大佬們肯定想得到呀,所以還是LUA,他使用了Hash的數(shù)據(jù)結(jié)構(gòu)。
主要是判斷鎖是否存在,存在就設(shè)置過期時間,如果鎖已經(jīng)存在了,那對比一下線程,線程是一個那就證明可以重入,鎖在了,但是不是當(dāng)前線程,證明別人還沒釋放,那就把剩余時間返回,加鎖失敗。
是不是有點(diǎn)繞,多理解一遍。
<T>RFuture<T>tryLockInnerAsync(longleaseTime,TimeUnitunit,
longthreadId,RedisStrictCommand<T>command){
//過期時間
internalLockLeaseTime=unit.toMillis(leaseTime);
returncommandExecutor.evalWriteAsync(getName(),LongCodec.INSTANCE,command,
//如果鎖不存在,則通過hset設(shè)置它的值,并設(shè)置過期時間
"if(redis.call('exists',KEYS[1])==0)then"+
"redis.call('hset',KEYS[1],ARGV[2],1);"+
"redis.call('pexpire',KEYS[1],ARGV[1]);"+
"returnnil;"+
"end;"+
//如果鎖已存在,并且鎖的是當(dāng)前線程,則通過hincrby給數(shù)值遞增1
"if(redis.call('hexists',KEYS[1],ARGV[2])==1)then"+
"redis.call('hincrby',KEYS[1],ARGV[2],1);"+
"redis.call('pexpire',KEYS[1],ARGV[1]);"+
"returnnil;"+
"end;"+
//如果鎖已存在,但并非本線程,則返回過期時間ttl
"returnredis.call('pttl',KEYS[1]);",
Collections.<Object>singletonList(getName()),
internalLockLeaseTime,getLockName(threadId));
}
解鎖鎖的釋放主要是publish釋放鎖的信息,然后做校驗(yàn),一樣會判斷是否當(dāng)前線程,成功就釋放鎖,還有個hincrby遞減的操作,鎖的值大于0說明是可重入鎖,那就刷新過期時間。
如果值小于0了,那刪掉Key釋放鎖。
是不是又和AQS很像了?
AQS就是通過一個volatile修飾status去看鎖的狀態(tài),也會看數(shù)值判斷是否是可重入的。
所以我說代碼的設(shè)計(jì),最后就萬劍歸一,都是一樣的。
publicRFuture<Void>unlockAsync(finallongthreadId){
finalRPromise<Void>result=newRedissonPromise<Void>();
//解鎖方法
RFuture<Boolean>future=unlockInnerAsync(threadId);
future.addListener(newFutureListener<Boolean>(){
@Override
publicvoidoperationComplete(Future<Boolean>future)throwsException{
if(!future.isSuccess()){
cancelExpirationRenewal(threadId);
result.tryFailure(future.cause());
return;
}
//獲取返回值
BooleanopStatus=future.getNow();
//如果返回空,則證明解鎖的線程和當(dāng)前鎖不是同一個線程,拋出異常
if(opStatus==null){
IllegalMonitorStateExceptioncause=
newIllegalMonitorStateException("
attempttounlocklock,notlockedbycurrentthreadbynodeid:"
+id+"thread-id:"+threadId);
result.tryFailure(cause);
return;
}
//解鎖成功,取消刷新過期時間的那個定時任務(wù)
if(opStatus){
cancelExpirationRenewal(null);
}
result.trySuccess(null);
}
});
returnresult;
}
protectedRFuture<Boolean>unlockInnerAsync(longthreadId){
returncommandExecutor.evalWriteAsync(getName(),LongCodec.INSTANCE,EVAL,
//如果鎖已經(jīng)不存在,發(fā)布鎖釋放的消息
"if(redis.call('exists',KEYS[1])==0)then"+
"redis.call('publish',KEYS[2],ARGV[1]);"+
"return1;"+
"end;"+
//如果釋放鎖的線程和已存在鎖的線程不是同一個線程,返回null
"if(redis.call('hexists',KEYS[1],ARGV[3])==0)then"+
"returnnil;"+
"end;"+
//通過hincrby遞減1的方式,釋放一次鎖
//若剩余次數(shù)大于0,則刷新過期時間
"localcounter=redis.call('hincrby',KEYS[1],ARGV[3],-1);"+
"if(counter>0)then"+
"redis.call('pexpire',KEYS[1],ARGV[2]);"+
"return0;"+
//否則證明鎖已經(jīng)釋放,刪除key并發(fā)布鎖釋放的消息
"else"+
"redis.call('del',KEYS[1]);"+
"redis.call('publish',KEYS[2],ARGV[1]);"+
"return1;"+
"end;"+
"returnnil;",
Arrays.<Object>asList(getName(),getChannelName()),
LockPubSub.unlockMessage,internalLockLeaseTime,getLockName(threadId));
}
總結(jié)這個寫了比較久,但是不是因?yàn)閺?fù)雜什么的,是因?yàn)閭€人工作的原因,最近事情很多嘛,還是那句話,程序員才是我的本職寫文章只是個愛好,不能本末倒置了。
大家會發(fā)現(xiàn),你學(xué)懂一個技術(shù)棧之后,學(xué)新的會很快,而且也能發(fā)現(xiàn)他們的設(shè)計(jì)思想和技巧真的很巧妙,也總能找到相似點(diǎn),和讓你驚嘆的點(diǎn)。
就拿Doug Lea寫的AbstractQueuedSynchronizer(AQS)來說,他寫了一行代碼,你可能看幾天才能看懂,大佬們的思想是真的牛。
我看源碼有時候也頭疼,但是去谷歌一下,自己理解一下,突然恍然大悟的時候覺得一切又很值。
學(xué)習(xí)就是一條時而郁郁寡歡,時而開環(huán)大笑的路,大家加油,我們成長路上一起共勉。
我是敖丙,一個在互聯(lián)網(wǎng)茍且偷生的工具人。
最好的關(guān)系是互相成就,大家的**「三連」**就是丙丙創(chuàng)作的最大動力,我們下期見!
注:如果本篇博客有任何錯誤和建議,歡迎人才們留言,你快說句話啊!
你知道的越多,你不知道的越多