java thread       

java 并发

c语言thread

创建线程

int pthread_create(pthread_t * tid, const pthread_attr_t * attr, void * ( * func) (void * ), void * arg);
其返回值是一个整数,若创建进程成功返回0,否则,返回其他错误代码,也是正整数。

结束线程

void pthread_exit (void *status);
参数是指针类型,用于存储线程结束后返回状态。

线程等待

int pthread_join (pthread_t tid, void ** status);

第一个参数表示要等待的进程的id;
第二参数表示要等待的进程的返回状态,是个二级指针。

返回当前线程ID

pthread_t pthread_self (void);
用于返回当前进程的ID

制定线程变成分裂状态

int pthread_detach (pthread_t tid);
参数是指定线程的ID,指定的ID的线程变成分离状态;若指定线程是分离状态,则 如果线程退出,那么它所有的资源都将释放,如果线程不是分离状态,线程必须保留它的线程ID、退出状态,直到其他线程对他调用的pthread_join()函数

c语言锁

互斥锁
pthread_mutex_t mutex 锁对象
pthread_mutex_init(&mutex,NULL) 在主线程中初始化锁为解锁状态
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER 编译时初始化锁位解锁状态
pthread_mutex_lock(&mutex): 访问临界区加锁操作
pthread_mutex_unlock(&mutex): 访问临界区解锁操作

条件变量
pthread_cond_init
pthread_cond_destroy
pthread_cond_wait
pthread_cond_timedwait
pthread_cond_signal
pthread_cond_broadcast

信号量
int sem_init(sem_t * sem, int pshared, unsigned int value);初始化信号量
int sem_wait(sem_t *sem); 信号量减一操作,有线程申请资源
int sem_post(sem_t *sem);信号量加一操作,有线程释放资源
int sem_destroy(sem_t *sem); 销毁信号量。

thread

中断线程

public void run() {
    while (!Thread.currentThread().isInterrupted()&& more work to do) {
        try {
            ...
            sleep(delay);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();//重新设置中断标示
        }
    }
}

抛出InterruptedException异常后,中断标示位会自动清除,所以要重新设置中断表示

https://www.cnblogs.com/onlywujun/p/3565082.html

指令重排

public class ThreadExample {
    static Thread thread = null;
    int i;
    public volatile static boolean runing = true;

    public static void main(String[] args) throws InterruptedException {
        traditional();
        Thread.sleep(100);
        runing = false;


    }


    public static void traditional() {
        thread = new Thread() {
            @Override
            public void run() {
               while (runing){           
                 //i++;
               }
            }
        };
        thread.start();
    }

public static void aa(){
        Integer i=1;
        i.toString();

    }
}




traditional被重排,running变成函数变量
public static void traditional() {
        bool temp = running;
        thread = new Thread() {
            @Override
            public void run() {
               while (temp){           
                 //i++;
               }
            }
        };
        thread.start();
    }


所以线程不会被中断,一直再运行

java并发编程基础

通用线程生命周期

image-20210426080154467

Java线程生命周期

这五种状态在不同编程语言里会有简化合并。例如,C 语言的 POSIX Threads 规范,就把初始状态和可运行状态合并了;Java 语言里则把可运行状态和运行状态合并了,这两个状态在操作系统调度层面有用,而 JVM 层面不关心这两个状态,因为 JVM 把线程调度交给操作系统处理了;细化休眠状态

image-20210426080228715

unsafe

rt.jar包的Unsafe类,提供硬件级别的原子性操作

public native int getInt(Object var1, long var2);

public native void putInt(Object var1, long var2, int var4);
等等

park unpark

底层jvm实现都是通过cas来实现原子性,park、unpark是通过条件变量来实现阻塞和唤醒

JUC里原子性操作AtomicInter、AtomicBoolean

juc的原子性操作都是基于Unsafe类实现

LockSupport

基于Unsafe类实现的同步工具,主要作用是挂起和唤醒线程,LockSupport.park、LockSupport.unpark

// LockSupport
public static void park(Object blocker) {  
    Thread t = Thread.currentThread();
    // blocker在什么对象上进行的阻塞操作
    setBlocker(t, blocker);
    UNSAFE.park(false, 0L);
    setBlocker(t, null);
}

UNSAFE使用park和unpark进行线程的阻塞和唤醒操作,park和unpark底层是借助系统层(Linux下)方法pthread_mutexpthread_cond来实现的,通过pthread_cond_wait函数可以对一个线程进行阻塞操作,在这之前,必须先获取pthread_mutex,通过pthread_cond_signal函数对一个线程进行唤醒操作。

java lock

AQS

基于LockSupport实现的同步工具

实现一个双向队列,挂起的线程插入到队列的尾部

子类继承aqs,子类实现tryAcquire,用cas判断锁状态,成功则返回,失败则线程挂起并插入到aqs阻塞队列尾部(aqs实现)。子类实现tryRelease,设置锁没有被占用状态,设置成功返回true,否则返回false

ReentrantLock

public class ReentrantLock implements Lock, java.io.Serializable {
  private final Sync sync;
  abstract static class Sync extends AbstractQueuedSynchronizer {
    实现两个抽象函数
    tryAcquire()
      
    tryRelease()  
     
  }
}

ReentrantLock lock = new ReentrantLock();
lock.lock();
lock.unlock();


加锁调用流程
ReentrantLock: lock()
FairSync: lock()
AbstractQueuedSynchronizer: acquire(int arg)
ReentrantLock: tryAcquire(int acquires)
解锁调用流程
ReentrantLock: unlock()
AbstractQueuedSynchronizer: release(int arg)
Sync: tryAcquire(int releases)

synchronized(1.6之前都是重量级锁) (monitorenter jvm指令)

java虚拟机中的同步(Synchronization)基于进入和退出管程(Monitor)对象实现,无论是显式同步(有明确的monitorenter和monitorexit指令,即同步代码块)还是隐式同步都是如此。在Java语言中,同步用的最多的地方可能是被synchronized修饰的同步方法。同步方法并不是由monitorenter和monitorexit指令来实现同步的,而是由方法调用指令读取运行时常量池中方法的ACC_SYNCHRONIZED标志来隐式实现的

当对象状态为偏向锁(biasable)时,mark word存储的是偏向的线程ID;当状态为轻量级锁(lightweight locked)时,mark word存储的是指向线程栈中Lock Record的指针;当状态为重量级锁(inflated)时,为指向堆中的monitor对象(ObjectMonitor)的指针。

当一个线程尝试获得锁时,如果该锁已经被占用,则会将该线程封装成一个ObjectWaiter对象插入到cxq的队列尾部,然后暂停当前线程。当持有锁的线程释放锁前,会将cxq中的所有元素移动到EntryList中去,并唤醒EntryList的队首线程。

如果一个线程在同步块中调用了Object#wait方法,会将该线程对应的ObjectWaiter从EntryList移除并加入到WaitSet中,然后释放锁。当wait的线程被notify之后,会将对应的ObjectWaiter从WaitSet移动到EntryList中。

设置对象头锁状态,通过cas_set_mark(atomic_cmpxchg_at)解决并发问题

atomic_cmpxchg_at(x, p, (ptrdiff_t)offset, e)
if x == *(p+offset) then *(p+offset) = e

同数据库的乐观锁操作,1.get old time value 2.update set new value where now value =old time value
每个对象都存在着一个monitor与之关联,用于重量级锁

    ObjectMonitor() {
        _header       = NULL;
        _count        = 0; //记录个数
        _waiters      = 0,
        _recursions   = 0;
        _object       = NULL;
        _owner        = NULL; //指向持有锁的线程
        _WaitSet      = NULL; //处于wait状态的线程,会被加入到_WaitSet
        _WaitSetLock  = 0 ;
        _Responsible  = NULL ;
        _succ         = NULL ;
        _cxq          = NULL ;
        FreeNext      = NULL ;
        _EntryList    = NULL ; //处于等待锁block状态的线程,会被加入到该列表
        _SpinFreq     = 0 ;
        _SpinClock    = 0 ;
        OwnerIsThread = 0 ;
    }

    线程运行到synchronized时候,会在栈上创建一个BasicObjectLock对象,即对象头里的Lock Record指针指向的地方

    class BasicObjectLock {
        friend class VMStructs;
        private:
        BasicLock _lock;                                    // the lock, must be double word aligned
        oop       _obj;                                     // object holds the lock;

        public:
        // Manipulation
        oop      obj() const                                { return _obj;  }
        void set_obj(oop obj)                               { _obj = obj; }
        BasicLock* lock()                                   { return &_lock; }

        // Note: Use frame::interpreter_frame_monitor_size() for the size of BasicObjectLocks
        //       in interpreter activation frames since it includes machine-specific padding.
        static int size()                                   { return sizeof(BasicObjectLock)/wordSize; }

        // GC support
        void oops_do(OopClosure* f) { f->do_oop(&_obj); }

        static int obj_offset_in_bytes()                    { return offset_of(BasicObjectLock, _obj);  }
        static int lock_offset_in_bytes()                   { return offset_of(BasicObjectLock, _lock); }
    };

    class BasicLock {
        friend class VMStructs;
        friend class JVMCIVMStructs;
        private:
        volatile markOop _displaced_header;
        public:
        markOop      displaced_header() const               { return _displaced_header; }
        void         set_displaced_header(markOop header)   { _displaced_header = header; }

        void print_on(outputStream* st) const;

        // move a basic lock (used during deoptimization
        void move_to(oop obj, BasicLock* dest);

        static int displaced_header_offset_in_bytes()       { return offset_of(BasicLock, _displaced_header); }
    };


    CASE(_monitorenter): {
        oop lockee = STACK_OBJECT(-1); //锁对象
        // derefing's lockee ought to provoke implicit null check
        CHECK_NULL(lockee);
        // find a free monitor or one already allocated for this object
        // if we find a matching object then we need a new monitor
        // since this is recursive enter
        BasicObjectLock* limit = istate->monitor_base();
        BasicObjectLock* most_recent = (BasicObjectLock*) istate->stack_base();
        BasicObjectLock* entry = NULL;
        //找到一个空闲的Lock Record
        while (most_recent != limit ) {
          if (most_recent->obj() == NULL) entry = most_recent;
          else if (most_recent->obj() == lockee) break;
          most_recent++;
        }
        if (entry != NULL) {
          //设置持有该lock record的对象
          entry->set_obj(lockee);
          int success = false;
          uintptr_t epoch_mask_in_place = (uintptr_t)markOopDesc::epoch_mask_in_place;

          markOop mark = lockee->mark();
          intptr_t hash = (intptr_t) markOopDesc::no_hash;
          // implies UseBiasedLocking
          if (mark->has_bias_pattern()) {
            uintptr_t thread_ident;
            uintptr_t anticipated_bias_locking_value;
            thread_ident = (uintptr_t)istate->thread();
            anticipated_bias_locking_value =
              (((uintptr_t)lockee->klass()->prototype_header() | thread_ident) ^ (uintptr_t)mark) &
              ~((uintptr_t) markOopDesc::age_mask_in_place);

            if  (anticipated_bias_locking_value == 0) {
              // already biased towards this thread, nothing to do
              if (PrintBiasedLockingStatistics) {
                (* BiasedLocking::biased_lock_entry_count_addr())++;
              }
              success = true;
            }
            else if ((anticipated_bias_locking_value & markOopDesc::biased_lock_mask_in_place) != 0) {
              // try revoke bias
              markOop header = lockee->klass()->prototype_header();
              if (hash != markOopDesc::no_hash) {
                header = header->copy_set_hash(hash);
              }
              if (lockee->cas_set_mark(header, mark) == mark) {
                if (PrintBiasedLockingStatistics)
                  (*BiasedLocking::revoked_lock_entry_count_addr())++;
              }
            }
            else if ((anticipated_bias_locking_value & epoch_mask_in_place) !=0) {
              // try rebias
              markOop new_header = (markOop) ( (intptr_t) lockee->klass()->prototype_header() | thread_ident);
              if (hash != markOopDesc::no_hash) {
                new_header = new_header->copy_set_hash(hash);
              }
              if (lockee->cas_set_mark(new_header, mark) == mark) {
                if (PrintBiasedLockingStatistics)
                  (* BiasedLocking::rebiased_lock_entry_count_addr())++;
              }
              else {
                CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
              }
              success = true;
            }
            else {
              // try to bias towards thread in case object is anonymously biased
              markOop header = (markOop) ((uintptr_t) mark & ((uintptr_t)markOopDesc::biased_lock_mask_in_place |
                                                              (uintptr_t)markOopDesc::age_mask_in_place |
                                                              epoch_mask_in_place));
              if (hash != markOopDesc::no_hash) {
                header = header->copy_set_hash(hash);
              }
              markOop new_header = (markOop) ((uintptr_t) header | thread_ident);
              // debugging hint
              DEBUG_ONLY(entry->lock()->set_displaced_header((markOop) (uintptr_t) 0xdeaddead);)
              if (lockee->cas_set_mark(new_header, header) == header) {
                if (PrintBiasedLockingStatistics)
                  (* BiasedLocking::anonymously_biased_lock_entry_count_addr())++;
              }
              else {
                CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
              }
              success = true;
            }
          }

          // traditional lightweight locking
          if (!success) {
            markOop displaced = lockee->mark()->set_unlocked();
            entry->lock()->set_displaced_header(displaced);
            bool call_vm = UseHeavyMonitors;
            if (call_vm || lockee->cas_set_mark((markOop)entry, displaced) != displaced) {
              // Is it simple recursive case?
              if (!call_vm && THREAD->is_lock_owned((address) displaced->clear_lock_bits())) {
                entry->lock()->set_displaced_header(NULL);
              } else {
                CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
              }
            }
          }
          UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1);
        } else {
            // lock recoder不够,则开始栈扩张,stack_base往下移,重新开始执行
          istate->set_msg(more_monitors);
          UPDATE_PC_AND_RETURN(0); // Re-execute
        }
    }

设置偏向锁

参考

https://blog.51cto.com/14440216/2426781

偏向锁

我们的方法一定要保证线程安全,但是实际情况不一定有互斥, 所以偏向锁是synchronized锁的对象如果没有资源竞争的情况下存在的

偏向锁不会调用os函数实现—第一次会调用

os函数pthread_mutex_lock()//上锁
pthread_mutex_lock(){
     fprintf(stderr,"msg"+pthred_self());
}

当JVM启用了偏向锁模式(1.6以上默认开启),当新创建一个对象的时候,如果该对象所属的class没有关闭偏向锁模式(什么时候会关闭一个class的偏向模式下文会说,默认所有class的偏向模式都是是开启的),那新创建对象的mark word将是可偏向状态,此时mark word中的thread id(参见上文偏向状态下的mark word格式)为0,表示未偏向任何线程,也叫做匿名偏向(anonymously biased)。

加锁过程

case 1:当该对象第一次被线程获得锁的时候,发现是匿名偏向状态,则会用CAS指令,将mark word中的thread id由0改成当前线程Id。如果成功,则代表获得了偏向锁,继续执行同步块中的代码。否则,将偏向锁撤销,升级为轻量级锁。

case 2:当被偏向的线程再次进入同步块时,发现锁对象偏向的就是当前线程,在通过一些额外的检查后(细节见后面的文章),会往当前线程的栈中添加一条Displaced Mark Word为空的Lock Record中,然后继续执行同步块的代码,因为操纵的是线程私有的栈,因此不需要用到CAS指令;由此可见偏向锁模式下,当被偏向的线程再次尝试获得锁时,仅仅进行几个简单的操作就可以了,在这种情况下,synchronized关键字带来的性能开销基本可以忽略。

case 3.当其他线程进入同步块时,发现已经有偏向的线程了,则会进入到撤销偏向锁的逻辑里,一般来说,会在safepoint中去查看偏向的线程是否还存活,如果存活且还在同步块中则将锁升级为轻量级锁,原偏向的线程继续拥有锁,当前线程则走入到锁升级的逻辑里;如果偏向的线程已经不存活或者不在同步块中,则将对象头的mark word改为无锁状态(unlocked),之后再升级为轻量级锁。

由此可见,偏向锁升级的时机为:当锁已经发生偏向后,只要有另一个线程尝试获得偏向锁,则该偏向锁就会升级成轻量级锁。当然这个说法不绝对,因为还有批量重偏向这一机制。

https://blog.51cto.com/14440216/2426781

轻量级锁

在多线程交替执行同步块的情况下

加锁过程

1.在线程栈中创建一个Lock Record,将其obj(即上图的Object reference)字段指向锁对象。

2.直接通过CAS指令将Lock Record的地址存储在对象头的mark word中,如果对象处于无锁状态则修改成功,代表该线程获得了轻量级锁。如果失败,进入到步骤3。

3.如果是当前线程已经持有该锁了,代表这是一次锁重入。设置Lock Record第一部分(Displaced Mark Word)为null,起到了一个重入计数器的作用。然后结束。

4.走到这一步说明发生了竞争,需要膨胀为重量级锁。

解锁过程

1.遍历线程栈,找到所有obj字段等于当前锁对象的Lock Record。

2.如果Lock Record的Displaced Mark Word为null,代表这是一次重入,将obj设置为null后continue。

3.如果Lock Record的Displaced Mark Word不为null,则利用CAS指令将对象头的mark word恢复成为Displaced Mark Word。如果成功,则continue,否则膨胀为重量级锁。

重量级锁

os函数使用的锁 pthread_mutex_lock

synchronized关键字基于上述两个指令实现了锁的获取和释放过程,解释器执行monitorenter时会进入到InterpreterRuntime.cpp的InterpreterRuntime::monitorenter

偏向锁的入口位于synchronizer.cpp文件的ObjectSynchronizer::fast_enter

条件变量

Object.wait Object.notify配合synchronized实现条件变量锁

AQS

锁的是对象

os实现mutex

lock: 
	if(mutex > 0){ 
		mutex = 0; 
		return 0; 
	} else 
		挂起等待; 
	goto lock;
		
unlock: 
	mutex = 1; 
	唤醒等待Mutex的线程; 
	return 0


判断锁过程不是原子


lock: 
	movb $0, %al 
	xchgb %al, mutex  寄存器和内存单元的数据相交换
	if(al寄存器的内容 > 0){ 
		return 0; 
	} else 
		挂起等待; 
	goto lock;
		
unlock: 
	movb $1, mutex 
	唤醒等待Mutex的线程; 
	return 0;

    一条指令保证原子性



“挂起等待”和“唤醒等待线程”的操作如何实现?每个Mutex有一个等待队列,一个线程要在Mutex上挂起等待,首先把自己加入等待队列中,然后置线程状态为睡眠,接着调用调度器函数切换到别的线程。一个线程要唤醒等待队列中的其它线程,只需从等待队列中取出一项,把它的状态从睡眠改为就绪,加入就绪队列,那么下次调度器函数执行时就有可能切换到被唤醒的线程。


并发风险

  1. 性能问题(上下文切换)
  2. 活跃性问题(饥饿、死锁、活锁)
  3. 线程安全(加锁、三大特性、happens-before(八大原则+6条推论))