int pthread_create(pthread_t * tid, const pthread_attr_t * attr, void * ( * func) (void * ), void * arg);
其返回值是一个整数,若创建进程成功返回0,否则,返回其他错误代码,也是正整数。
void pthread_exit (void *status);
参数是指针类型,用于存储线程结束后返回状态。
int pthread_join (pthread_t tid, void ** status);
第一个参数表示要等待的进程的id;
第二参数表示要等待的进程的返回状态,是个二级指针。
pthread_t pthread_self (void);
用于返回当前进程的ID
int pthread_detach (pthread_t tid);
参数是指定线程的ID,指定的ID的线程变成分离状态;若指定线程是分离状态,则 如果线程退出,那么它所有的资源都将释放,如果线程不是分离状态,线程必须保留它的线程ID、退出状态,直到其他线程对他调用的pthread_join()函数
互斥锁
pthread_mutex_t mutex 锁对象
pthread_mutex_init(&mutex,NULL) 在主线程中初始化锁为解锁状态
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER 编译时初始化锁位解锁状态
pthread_mutex_lock(&mutex): 访问临界区加锁操作
pthread_mutex_unlock(&mutex): 访问临界区解锁操作
条件变量
pthread_cond_init
pthread_cond_destroy
pthread_cond_wait
pthread_cond_timedwait
pthread_cond_signal
pthread_cond_broadcast
信号量
int sem_init(sem_t * sem, int pshared, unsigned int value);初始化信号量
int sem_wait(sem_t *sem); 信号量减一操作,有线程申请资源
int sem_post(sem_t *sem);信号量加一操作,有线程释放资源
int sem_destroy(sem_t *sem); 销毁信号量。
public void run() {
while (!Thread.currentThread().isInterrupted()&& more work to do) {
try {
...
sleep(delay);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();//重新设置中断标示
}
}
}
抛出InterruptedException异常后,中断标示位会自动清除,所以要重新设置中断表示
https://www.cnblogs.com/onlywujun/p/3565082.html
public class ThreadExample {
static Thread thread = null;
int i;
public volatile static boolean runing = true;
public static void main(String[] args) throws InterruptedException {
traditional();
Thread.sleep(100);
runing = false;
}
public static void traditional() {
thread = new Thread() {
@Override
public void run() {
while (runing){
//i++;
}
}
};
thread.start();
}
public static void aa(){
Integer i=1;
i.toString();
}
}
traditional被重排,running变成函数变量
public static void traditional() {
bool temp = running;
thread = new Thread() {
@Override
public void run() {
while (temp){
//i++;
}
}
};
thread.start();
}
所以线程不会被中断,一直再运行
这五种状态在不同编程语言里会有简化合并。例如,C 语言的 POSIX Threads 规范,就把初始状态和可运行状态合并了;Java 语言里则把可运行状态和运行状态合并了,这两个状态在操作系统调度层面有用,而 JVM 层面不关心这两个状态,因为 JVM 把线程调度交给操作系统处理了;细化休眠状态
rt.jar包的Unsafe类,提供硬件级别的原子性操作
public native int getInt(Object var1, long var2);
public native void putInt(Object var1, long var2, int var4);
等等
park unpark
底层jvm实现都是通过cas来实现原子性,park、unpark是通过条件变量来实现阻塞和唤醒
juc的原子性操作都是基于Unsafe类实现
基于Unsafe类实现的同步工具,主要作用是挂起和唤醒线程,LockSupport.park、LockSupport.unpark
// LockSupport
public static void park(Object blocker) {
Thread t = Thread.currentThread();
// blocker在什么对象上进行的阻塞操作
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
setBlocker(t, null);
}
UNSAFE使用park和unpark进行线程的阻塞和唤醒操作,park和unpark底层是借助系统层(Linux下)方法pthread_mutex
和pthread_cond
来实现的,通过pthread_cond_wait
函数可以对一个线程进行阻塞操作,在这之前,必须先获取pthread_mutex
,通过pthread_cond_signal
函数对一个线程进行唤醒操作。
基于LockSupport实现的同步工具
实现一个双向队列,挂起的线程插入到队列的尾部
子类继承aqs,子类实现tryAcquire,用cas判断锁状态,成功则返回,失败则线程挂起并插入到aqs阻塞队列尾部(aqs实现)。子类实现tryRelease,设置锁没有被占用状态,设置成功返回true,否则返回false
public class ReentrantLock implements Lock, java.io.Serializable {
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
实现两个抽象函数
tryAcquire()
tryRelease()
}
}
ReentrantLock lock = new ReentrantLock();
lock.lock();
lock.unlock();
加锁调用流程
ReentrantLock: lock()
FairSync: lock()
AbstractQueuedSynchronizer: acquire(int arg)
ReentrantLock: tryAcquire(int acquires)
解锁调用流程
ReentrantLock: unlock()
AbstractQueuedSynchronizer: release(int arg)
Sync: tryAcquire(int releases)
java虚拟机中的同步(Synchronization)基于进入和退出管程(Monitor)对象实现,无论是显式同步(有明确的monitorenter和monitorexit指令,即同步代码块)还是隐式同步都是如此。在Java语言中,同步用的最多的地方可能是被synchronized修饰的同步方法。同步方法并不是由monitorenter和monitorexit指令来实现同步的,而是由方法调用指令读取运行时常量池中方法的ACC_SYNCHRONIZED标志来隐式实现的
当对象状态为偏向锁(biasable)时,mark word存储的是偏向的线程ID;当状态为轻量级锁(lightweight locked)时,mark word存储的是指向线程栈中Lock Record的指针;当状态为重量级锁(inflated)时,为指向堆中的monitor对象(ObjectMonitor)的指针。
当一个线程尝试获得锁时,如果该锁已经被占用,则会将该线程封装成一个ObjectWaiter对象插入到cxq的队列尾部,然后暂停当前线程。当持有锁的线程释放锁前,会将cxq中的所有元素移动到EntryList中去,并唤醒EntryList的队首线程。
如果一个线程在同步块中调用了Object#wait方法,会将该线程对应的ObjectWaiter从EntryList移除并加入到WaitSet中,然后释放锁。当wait的线程被notify之后,会将对应的ObjectWaiter从WaitSet移动到EntryList中。
设置对象头锁状态,通过cas_set_mark(atomic_cmpxchg_at)解决并发问题
atomic_cmpxchg_at(x, p, (ptrdiff_t)offset, e)
if x == *(p+offset) then *(p+offset) = e
同数据库的乐观锁操作,1.get old time value 2.update set new value where now value =old time value
每个对象都存在着一个monitor与之关联,用于重量级锁
ObjectMonitor() {
_header = NULL;
_count = 0; //记录个数
_waiters = 0,
_recursions = 0;
_object = NULL;
_owner = NULL; //指向持有锁的线程
_WaitSet = NULL; //处于wait状态的线程,会被加入到_WaitSet
_WaitSetLock = 0 ;
_Responsible = NULL ;
_succ = NULL ;
_cxq = NULL ;
FreeNext = NULL ;
_EntryList = NULL ; //处于等待锁block状态的线程,会被加入到该列表
_SpinFreq = 0 ;
_SpinClock = 0 ;
OwnerIsThread = 0 ;
}
线程运行到synchronized时候,会在栈上创建一个BasicObjectLock对象,即对象头里的Lock Record指针指向的地方
class BasicObjectLock {
friend class VMStructs;
private:
BasicLock _lock; // the lock, must be double word aligned
oop _obj; // object holds the lock;
public:
// Manipulation
oop obj() const { return _obj; }
void set_obj(oop obj) { _obj = obj; }
BasicLock* lock() { return &_lock; }
// Note: Use frame::interpreter_frame_monitor_size() for the size of BasicObjectLocks
// in interpreter activation frames since it includes machine-specific padding.
static int size() { return sizeof(BasicObjectLock)/wordSize; }
// GC support
void oops_do(OopClosure* f) { f->do_oop(&_obj); }
static int obj_offset_in_bytes() { return offset_of(BasicObjectLock, _obj); }
static int lock_offset_in_bytes() { return offset_of(BasicObjectLock, _lock); }
};
class BasicLock {
friend class VMStructs;
friend class JVMCIVMStructs;
private:
volatile markOop _displaced_header;
public:
markOop displaced_header() const { return _displaced_header; }
void set_displaced_header(markOop header) { _displaced_header = header; }
void print_on(outputStream* st) const;
// move a basic lock (used during deoptimization
void move_to(oop obj, BasicLock* dest);
static int displaced_header_offset_in_bytes() { return offset_of(BasicLock, _displaced_header); }
};
CASE(_monitorenter): {
oop lockee = STACK_OBJECT(-1); //锁对象
// derefing's lockee ought to provoke implicit null check
CHECK_NULL(lockee);
// find a free monitor or one already allocated for this object
// if we find a matching object then we need a new monitor
// since this is recursive enter
BasicObjectLock* limit = istate->monitor_base();
BasicObjectLock* most_recent = (BasicObjectLock*) istate->stack_base();
BasicObjectLock* entry = NULL;
//找到一个空闲的Lock Record
while (most_recent != limit ) {
if (most_recent->obj() == NULL) entry = most_recent;
else if (most_recent->obj() == lockee) break;
most_recent++;
}
if (entry != NULL) {
//设置持有该lock record的对象
entry->set_obj(lockee);
int success = false;
uintptr_t epoch_mask_in_place = (uintptr_t)markOopDesc::epoch_mask_in_place;
markOop mark = lockee->mark();
intptr_t hash = (intptr_t) markOopDesc::no_hash;
// implies UseBiasedLocking
if (mark->has_bias_pattern()) {
uintptr_t thread_ident;
uintptr_t anticipated_bias_locking_value;
thread_ident = (uintptr_t)istate->thread();
anticipated_bias_locking_value =
(((uintptr_t)lockee->klass()->prototype_header() | thread_ident) ^ (uintptr_t)mark) &
~((uintptr_t) markOopDesc::age_mask_in_place);
if (anticipated_bias_locking_value == 0) {
// already biased towards this thread, nothing to do
if (PrintBiasedLockingStatistics) {
(* BiasedLocking::biased_lock_entry_count_addr())++;
}
success = true;
}
else if ((anticipated_bias_locking_value & markOopDesc::biased_lock_mask_in_place) != 0) {
// try revoke bias
markOop header = lockee->klass()->prototype_header();
if (hash != markOopDesc::no_hash) {
header = header->copy_set_hash(hash);
}
if (lockee->cas_set_mark(header, mark) == mark) {
if (PrintBiasedLockingStatistics)
(*BiasedLocking::revoked_lock_entry_count_addr())++;
}
}
else if ((anticipated_bias_locking_value & epoch_mask_in_place) !=0) {
// try rebias
markOop new_header = (markOop) ( (intptr_t) lockee->klass()->prototype_header() | thread_ident);
if (hash != markOopDesc::no_hash) {
new_header = new_header->copy_set_hash(hash);
}
if (lockee->cas_set_mark(new_header, mark) == mark) {
if (PrintBiasedLockingStatistics)
(* BiasedLocking::rebiased_lock_entry_count_addr())++;
}
else {
CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
}
success = true;
}
else {
// try to bias towards thread in case object is anonymously biased
markOop header = (markOop) ((uintptr_t) mark & ((uintptr_t)markOopDesc::biased_lock_mask_in_place |
(uintptr_t)markOopDesc::age_mask_in_place |
epoch_mask_in_place));
if (hash != markOopDesc::no_hash) {
header = header->copy_set_hash(hash);
}
markOop new_header = (markOop) ((uintptr_t) header | thread_ident);
// debugging hint
DEBUG_ONLY(entry->lock()->set_displaced_header((markOop) (uintptr_t) 0xdeaddead);)
if (lockee->cas_set_mark(new_header, header) == header) {
if (PrintBiasedLockingStatistics)
(* BiasedLocking::anonymously_biased_lock_entry_count_addr())++;
}
else {
CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
}
success = true;
}
}
// traditional lightweight locking
if (!success) {
markOop displaced = lockee->mark()->set_unlocked();
entry->lock()->set_displaced_header(displaced);
bool call_vm = UseHeavyMonitors;
if (call_vm || lockee->cas_set_mark((markOop)entry, displaced) != displaced) {
// Is it simple recursive case?
if (!call_vm && THREAD->is_lock_owned((address) displaced->clear_lock_bits())) {
entry->lock()->set_displaced_header(NULL);
} else {
CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
}
}
}
UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1);
} else {
// lock recoder不够,则开始栈扩张,stack_base往下移,重新开始执行
istate->set_msg(more_monitors);
UPDATE_PC_AND_RETURN(0); // Re-execute
}
}
设置偏向锁
https://blog.51cto.com/14440216/2426781
我们的方法一定要保证线程安全,但是实际情况不一定有互斥, 所以偏向锁是synchronized锁的对象如果没有资源竞争的情况下存在的
偏向锁不会调用os函数实现—第一次会调用
os函数pthread_mutex_lock()//上锁
pthread_mutex_lock(){
fprintf(stderr,"msg"+pthred_self());
}
当JVM启用了偏向锁模式(1.6以上默认开启),当新创建一个对象的时候,如果该对象所属的class没有关闭偏向锁模式(什么时候会关闭一个class的偏向模式下文会说,默认所有class的偏向模式都是是开启的),那新创建对象的mark word将是可偏向状态,此时mark word中的thread id(参见上文偏向状态下的mark word格式)为0,表示未偏向任何线程,也叫做匿名偏向(anonymously biased)。
加锁过程
case 1:当该对象第一次被线程获得锁的时候,发现是匿名偏向状态,则会用CAS指令,将mark word中的thread id由0改成当前线程Id。如果成功,则代表获得了偏向锁,继续执行同步块中的代码。否则,将偏向锁撤销,升级为轻量级锁。
case 2:当被偏向的线程再次进入同步块时,发现锁对象偏向的就是当前线程,在通过一些额外的检查后(细节见后面的文章),会往当前线程的栈中添加一条Displaced Mark Word为空的Lock Record中,然后继续执行同步块的代码,因为操纵的是线程私有的栈,因此不需要用到CAS指令;由此可见偏向锁模式下,当被偏向的线程再次尝试获得锁时,仅仅进行几个简单的操作就可以了,在这种情况下,synchronized关键字带来的性能开销基本可以忽略。
case 3.当其他线程进入同步块时,发现已经有偏向的线程了,则会进入到撤销偏向锁的逻辑里,一般来说,会在safepoint中去查看偏向的线程是否还存活,如果存活且还在同步块中则将锁升级为轻量级锁,原偏向的线程继续拥有锁,当前线程则走入到锁升级的逻辑里;如果偏向的线程已经不存活或者不在同步块中,则将对象头的mark word改为无锁状态(unlocked),之后再升级为轻量级锁。
由此可见,偏向锁升级的时机为:当锁已经发生偏向后,只要有另一个线程尝试获得偏向锁,则该偏向锁就会升级成轻量级锁。当然这个说法不绝对,因为还有批量重偏向这一机制。
https://blog.51cto.com/14440216/2426781
在多线程交替执行同步块的情况下
加锁过程
1.在线程栈中创建一个Lock Record,将其obj(即上图的Object reference)字段指向锁对象。
2.直接通过CAS指令将Lock Record的地址存储在对象头的mark word中,如果对象处于无锁状态则修改成功,代表该线程获得了轻量级锁。如果失败,进入到步骤3。
3.如果是当前线程已经持有该锁了,代表这是一次锁重入。设置Lock Record第一部分(Displaced Mark Word)为null,起到了一个重入计数器的作用。然后结束。
4.走到这一步说明发生了竞争,需要膨胀为重量级锁。
解锁过程
1.遍历线程栈,找到所有obj字段等于当前锁对象的Lock Record。
2.如果Lock Record的Displaced Mark Word为null,代表这是一次重入,将obj设置为null后continue。
3.如果Lock Record的Displaced Mark Word不为null,则利用CAS指令将对象头的mark word恢复成为Displaced Mark Word。如果成功,则continue,否则膨胀为重量级锁。
os函数使用的锁 pthread_mutex_lock
synchronized关键字基于上述两个指令实现了锁的获取和释放过程,解释器执行monitorenter时会进入到InterpreterRuntime.cpp的InterpreterRuntime::monitorenter
偏向锁的入口位于synchronizer.cpp文件的ObjectSynchronizer::fast_enter
Object.wait Object.notify配合synchronized实现条件变量锁
lock:
if(mutex > 0){
mutex = 0;
return 0;
} else
挂起等待;
goto lock;
unlock:
mutex = 1;
唤醒等待Mutex的线程;
return 0
判断锁过程不是原子
lock:
movb $0, %al
xchgb %al, mutex 寄存器和内存单元的数据相交换
if(al寄存器的内容 > 0){
return 0;
} else
挂起等待;
goto lock;
unlock:
movb $1, mutex
唤醒等待Mutex的线程;
return 0;
一条指令保证原子性
“挂起等待”和“唤醒等待线程”的操作如何实现?每个Mutex有一个等待队列,一个线程要在Mutex上挂起等待,首先把自己加入等待队列中,然后置线程状态为睡眠,接着调用调度器函数切换到别的线程。一个线程要唤醒等待队列中的其它线程,只需从等待队列中取出一项,把它的状态从睡眠改为就绪,加入就绪队列,那么下次调度器函数执行时就有可能切换到被唤醒的线程。