青年IT男

个人从事金融行业,就职过易极付、思建科技、某网约车平台等重庆一流技术团队,目前就职于某银行负责统一支付系统建设。自身对金融行业有强烈的爱好。同时也实践大数据、数据存储、自动化集成和部署、分布式微服务、响应式编程、人工智能等领域。同时也热衷于技术分享创立公众号和博客站点对知识体系进行分享。

TheadLocal一文让你读懂

TheadLocal一文让你读懂

前言

现在正是抗击新型冠状病毒关键时期,为了响应国家号召在家休息减少外出这是作为我们普通老百姓对国家最大的贡献,同时必要外出时必须要戴上口罩,回家后需要使用消毒剂或肥皂洗手。相信国家和一线的工作者还有我们广大的人民群众我们一定能够战胜。有关最新疫情实时数据可以查看官方数据以下链接直达!

疫情实时大数据报告

新型肺炎确诊患者相同行程查询工具

什么是ThreadLocal

回到今天的主题,ThreadLocal我相信有个工作经验的小伙伴一定有所了解,或许你可能已经深入源码了解过,so 今天我们一起来从头到尾、简单到复杂以及深入源码一起来分析。

那么什么是TheadLocal呢?到目前为止你可以这样理解,联想到我们的JMM内存模型,以下代码说明:

 public static void main(String[] args) {
        Integer temp = Integer.valueOf(10);
        for (int i = 0; i < 10; i++) {
            new Thread(() -> System.out.println("The thread is hold the "+temp+" temp "));
        }
    }

可以从上面的代码看出我们的java内存模型

  • 首先在虚拟机内存分配一个Integer大小的内存然后一个temp内存指向改内存地址
  • 开启10个线程,每个线程从主存中获取temp指向地址内存的副本
  • 然后输出副本内容

But我们的ThreadLocal和JMM有点类似,即每个线程都持有ThreadLocal的一个本地副本线程持有,并且互相隔离、不相互影响。

ThreadLocal使用场景

入门示例
package com.ouwen.springboot.juc;

import java.util.concurrent.TimeUnit;

/**
 * @author <a href="http://youngitman.tech">青年IT男</a>
 * @version v1.0.0
 * @className ThreadLocalTest
 * @description
 * @date 2020-01-19 22:06
 * @JunitTest: {@link  }
 **/
public class ThreadLocalTest {


    public static void main(String[] args) throws InterruptedException {

        testForNonInit();

//        testForInit();

        TimeUnit.MILLISECONDS.sleep(10_000);//jvm在所有非Daemon线程退出后停止
    }

    /***
     *
     * 测试不带初始化方式
     *
     * @author liyong
     * @date 11:32 2020-01-29
     *  * @param
     * @exception
     * @return void
     **/
    private static void testForNonInit() {

        ThreadLocal threadLocal = new ThreadLocal<Integer>();//new 一个示例无初始值

        for (int i = 0; i < 10; i++) {
            final int index = i;
            new Thread(() -> {
                System.out.println("The thread" + Thread.currentThread().getName() + " index number is " + threadLocal.get() + " at before");
                threadLocal.set(index);
                System.out.println("The thread" + Thread.currentThread().getName() + " index number is " + threadLocal.get() + " at after ");
                threadLocal.remove();
            }).start();
        }

        testMain(threadLocal);
    }

    /***
     *
     * 测试带初始化值方式
     *
     * @author liyong
     * @date 11:32 2020-01-29
     *  * @param
     * @exception
     * @return void
     **/
    private static void testForInit() {

        ThreadLocal<Integer> threadLocal = ThreadLocal.withInitial(() -> 10);//通过静态方法withInitial传入一个初始化方法,更灵活

        for (int i = 0; i < 10; i++) {
            final int index = i;
            new Thread(() -> {
                System.out.println("The thread" + Thread.currentThread().getName() + " index number is " + threadLocal.get() + " at before");
                threadLocal.set(index);
                System.out.println("The thread" + Thread.currentThread().getName() + " index number is " + threadLocal.get() + " at after ");
                threadLocal.remove();
            }).start();
        }

        testMain(threadLocal);
    }

    /***
     *
     * 测试主线程
     *
     * @author liyong 
     * @date 11:45 2020-01-29 
     *  * @param threadLocal
     * @exception 
     * @return void 
     **/
    private static void testMain(ThreadLocal threadLocal){
        System.out.println("The thread" + Thread.currentThread().getName() + " index number is " + threadLocal.get() + " at before");
        threadLocal.set("`mainindex`");
        System.out.println("The thread" + Thread.currentThread().getName() + " index number is " + threadLocal.get() + " at after ");
        threadLocal.remove();
    }

}

  • testForNonInit()输出结果
    The threadThread-9 index number is null at before
    The threadThread-0 index number is null at before
    The threadThread-6 index number is null at before
    The threadThread-7 index number is null at before
    The threadThread-4 index number is null at before
    The threadThread-3 index number is null at before
    The threadThread-5 index number is null at before
    The threadThread-1 index number is null at before
    The threadThread-9 index number is 9 at after 
    The threadThread-8 index number is null at before
    The threadThread-7 index number is 7 at after 
    The threadThread-2 index number is null at before
    The threadThread-4 index number is 4 at after 
    The threadThread-1 index number is 1 at after 
    The threadThread-5 index number is 5 at after 
    The threadThread-6 index number is 6 at after 
    The threadThread-8 index number is 8 at after 
    The threadThread-0 index number is 0 at after 
    The threadmain index number is null at before
    The threadThread-3 index number is 3 at after 
    The threadThread-2 index number is 2 at after 
    The threadmain index number is `mainindex` at after 
    

    可以看出在不带初始化值的方式是threadLocal.get()是没有值的,同时每一个线程threadLocal.set(index)设置的值互不相影响、相互隔离。

  • testForInit()输出结果

    The threadThread-4 index number is 10 at before
    The threadThread-2 index number is 10 at before
    The threadThread-8 index number is 10 at before
    The threadThread-2 index number is 2 at after 
    The threadThread-8 index number is 8 at after 
    The threadThread-5 index number is 10 at before
    The threadThread-5 index number is 5 at after 
    The threadThread-7 index number is 10 at before
    The threadThread-7 index number is 7 at after 
    The threadThread-0 index number is 10 at before
    The threadmain index number is 10 at before
    The threadThread-0 index number is 0 at after 
    The threadThread-1 index number is 10 at before
    The threadThread-1 index number is 1 at after 
    The threadThread-4 index number is 4 at after 
    The threadThread-3 index number is 10 at before
    The threadThread-3 index number is 3 at after 
    The threadThread-9 index number is 10 at before
    The threadThread-9 index number is 9 at after 
    The threadThread-6 index number is 10 at before
    The threadThread-6 index number is 6 at after 
    The threadmain index number is `mainindex` at after 
    

    可以看出在不带初始化值的方式是threadLocal.get()是有 默认值的,同时每一个线程threadLocal.set(index)设置的值互不相影响、相互隔离。

  • 图解

    ThreadLocal

Spring事务使用

​ 在spring中对事物管理的抽象如下:

AbstractTransactionManager

其中DataSourceTransactionManager是我们着重关注的,首先看下我们的使用demo

 @Override
    public void hello2() {
        // 构造一个准备使用此事务的定义信息~~~
        DefaultTransactionDefinition transactionDefinition = new DefaultTransactionDefinition();
        transactionDefinition.setReadOnly(false);
        //隔离级别,-1表示使用数据库默认级别
        transactionDefinition.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED);
        transactionDefinition.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);

        // 根据此事务属性,拿到一个事务实例   注意此处的入参是一个:TransactionDefinition
        TransactionStatus transaction = transactionManager.getTransaction(transactionDefinition);

        try {
            // =================做你的逻辑start  必须try住,但无需写finally=======================
            // 向数据库插入一条记录
            String sql = "insert into user (name,age) values ('fsx',21)";
            jdbcTemplate.update(sql);

            // 做其余的事情  可能抛出异常
            System.out.println(1 / 0);
            // =================做你的逻辑start=======================
            // 提交事务
            transactionManager.commit(transaction);
        } catch (Exception e) {
            // 若发现异常  事务进行回滚
            transactionManager.rollback(transaction);
            throw e;
        }
    }

AbstractPlatformTransactionManager.getTransaction中可以到

public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
        Object transaction = doGetTransaction();//着重看这里其他忽略掉
        //...
    }

看DataSourceTransactionManager.doGetTransaction的实现

@Override
    protected Object doGetTransaction() {
        DataSourceTransactionObject txObject = new DataSourceTransactionObject();
        txObject.setSavepointAllowed(isNestedTransactionAllowed());
        ConnectionHolder conHolder =
                (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());//从事务管理器中获取ConnectionHolder,而ConnectionHolder封装了对数据连接的持有
        txObject.setConnectionHolder(conHolder, false);
        return txObject;
    }

继续跟踪org.springframework.jdbc.datasource.DataSourceTransactionManager#obtainDataSource

/**
     * Obtain the DataSource for actual use.获取真实的数据库连接
     * @return the DataSource (never {@code null})
     * @throws IllegalStateException in case of no DataSource set
     * @since 5.0
     */
    protected DataSource obtainDataSource() {
        DataSource dataSource = getDataSource();
        Assert.state(dataSource != null, "No DataSource set");
        return dataSource;
    }

然后使用org.springframework.transaction.support.TransactionSynchronizationManager#getResource

public static Object getResource(Object key) {
        Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);//获取真实对象 Unwrap the given resource handle if necessary; otherwise return the given handle as-is
        Object value = doGetResource(actualKey);
        if (value != null && logger.isTraceEnabled()) {
            logger.trace("Retrieved value [" + value + "] for key [" + actualKey + "] bound to thread [" +
                    Thread.currentThread().getName() + "]");
        }
        return value;
    }

关键的地方到了org.springframework.transaction.support.TransactionSynchronizationManager#doGetResource

private static final ThreadLocal<Map<Object, Object>> resources =
            new NamedThreadLocal<>("Transactional resources");//事务和线程绑定 <DataSource,ConnectionHolder>

/**
     * Actually check the value of the resource that is bound for the given key.
     */
    @Nullable
    private static Object doGetResource(Object actualKey) {
        Map<Object, Object> map = resources.get();
        if (map == null) {
            return null;
        }
        Object value = map.get(actualKey);//获取这个DataSource对应的ConnectionHolder
        // Transparently remove ResourceHolder that was marked as void...
        if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
            map.remove(actualKey);
            // Remove entire ThreadLocal if empty...
            if (map.isEmpty()) {
                resources.remove();//help gc
            }
            value = null;
        }
        return value;
    }

ThreadLocal源码分析

​ 下面我们一起来通过上面的入门示例分析ThreadLocal源码

  • ThreadLocal的实例化
    • new ThreadLocal()空的构造函数略过
      /**
         * Creates a thread local variable.
         * @see #withInitial(java.util.function.Supplier)
         */
        public ThreadLocal() {
        }
    
    • ThreadLocal.withInitial(method)需要提供一个Supplier方法这是jdk8提供的方式(提供一个生产者方法)
      public static <S> ThreadLocal<S> withInitial(Supplier<? extends S> supplier) {
            return new SuppliedThreadLocal<>(supplier);
        }
    

    SuppliedThreadLocal是TheadLocal的内部类

      static final class SuppliedThreadLocal<T> extends ThreadLocal<T> {
    
            private final Supplier<? extends T> supplier;
    
            SuppliedThreadLocal(Supplier<? extends T> supplier) {
                this.supplier = Objects.requireNonNull(supplier);
            }
                    //重写ThreadLocal的initialValue方法,后面会降到调用地方
            @Override
            protected T initialValue() {
                return supplier.get();//调用supplier的get方法执行method方法体
            }
        }
    
  • ThreadLocal.get()方法调用
    public T get() {
          Thread t = Thread.currentThread();//拿到当前线程
          ThreadLocalMap map = getMap(t);//从t线程中获取  ThreadLocal.ThreadLocalMap threadLocals持有的值
          if (map != null) {//当前线程存在ThreadLocalMap
              ThreadLocalMap.Entry e = map.getEntry(this);//以当天ThreadLocal对象为key从ThreadLocal.ThreadLocalMap获取ThreadLocalMap.Entry值
              if (e != null) {
                  @SuppressWarnings("unchecked")
                  T result = (T)e.value;//强制转换T类型
                  return result;
              }
          }
          return setInitialValue();//调用
      }
    

    使用java.lang.ThreadLocal#getMap获取当前线程持有的ThreadLocal.ThreadLocalMap

    ThreadLocalMap getMap(Thread t) {
          return t.threadLocals;
      }
    

    分析java.lang.ThreadLocal.ThreadLocalMap#getEntry方法获取ThreadLocalMap.Entry

      private Entry getEntry(ThreadLocal<?> key) {
              int i = key.threadLocalHashCode & (table.length - 1);//获取在数组中索引为止。可以认为就是我们日常所说的取模,区别是这个二进制运算更高效且数据更为分散碰撞几率低等特点(使用了斐波那契散列法)。
              Entry e = table[i];
              if (e != null && e.get() == key)//e不等于null并且Entry的key==传入的ThreadLocal
                  return e;
              else
                  return getEntryAfterMiss(key, i, e);//e==null或者key和当前i查找出来TheadLocal不相等
          }
    

    接下来继续分析java.lang.ThreadLocal.ThreadLocalMap#getEntryAfterMiss方法

    private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
              Entry[] tab = table;
              int len = tab.length;
                        //根据进入条件e==null或者key和当前i查找,可以知道当进入条件e==null时直接return null
              while (e != null) {
                  ThreadLocal<?> k = e.get();
                  if (k == key)//再次判断ThreadLoacl对象是否相等
                      return e;
                  if (k == null)//k等于null说明被gc回收,触发一些`stale`的清理(后面具体分析回收原理)
                      expungeStaleEntry(i);
                  else
                      i = nextIndex(i, len);//可理解为环查找
                  e = tab[i];
              }
              return null;
          }
    }
    

    下面重点分析java.lang.ThreadLocal.ThreadLocalMap#expungeStaleEntry方法,这个方法主要做了一些stale的Entry的清理工作并且重新整理存在hash索引冲突的元素位置。

    private int expungeStaleEntry(int staleSlot) {//staleSlot表示这个Entry的key被回收的索引位置
              Entry[] tab = table;
              int len = tab.length;
    
              // expunge entry at staleSlot to help gc
              tab[staleSlot].value = null;
              tab[staleSlot] = null;
              size--;//统计减1
    
              // Rehash until we encounter null
              Entry e;
              int i;
              for (i = nextIndex(staleSlot, len);//初始位置
                   (e = tab[i]) != null;//数组中的Entry不为null
                   i = nextIndex(i, len)) {//从i位置开始查找下个索引位置(环型查找)
                  ThreadLocal<?> k = e.get();
                  if (k == null) {//说明Entry的key(ThreadLocal)被gc回收
                      e.value = null;//help gc 
                      tab[i] = null;
                      size--;
                  } else {
                      int h = k.threadLocalHashCode & (len - 1);
                      if (h != i) {//说明i位置的ThreadLocal和h位置的ThreadLocal索引不等,存在冲突
                          tab[i] = null;
    
                          // Unlike Knuth 6.4 Algorithm R, we must scan until
                          // null because multiple entries could have been stale.
                          while (tab[h] != null)//从h位置向下查找为null的位置
                              h = nextIndex(h, len);
                          tab[h] = e;//把i这个位置的entry放到查找到为null值的位置去(整理冲突位置元素)
                      }
                  }
              }
              return i;
          }
    

    理解下java.lang.ThreadLocal.ThreadLocalMap#nextIndex,相当于是环型查找

    private static int nextIndex(int i, int len) {
        return ((i + 1 < len) ? i + 1 : 0);
    }
    

    当ThreadLocalMap不存在值调用java.lang.ThreadLocal#setInitialValue进行相关初始化

    private T setInitialValue() {
          T value = initialValue();//调用SuppliedThreadLocal重写的initialValue方法
          Thread t = Thread.currentThread();
          ThreadLocalMap map = getMap(t);
          if (map != null)
              map.set(this, value);//添加当前ThreadLocal的值到ThreadLocalMap
          else
              createMap(t, value);//创建ThreadLocalMap
          return value;
      }
    
    
  • ThreadLocal.set()方法调用
    public void set(T value) {
          Thread t = Thread.currentThread();
          ThreadLocalMap map = getMap(t);//首先获取当前线程持有的 ThreadLocal.ThreadLocalMap 
          if (map != null)
              map.set(this, value);//覆盖该值
          else
              createMap(t, value);//不存在新建 ThreadLocal.ThreadLocalMap 
      }
    

    下面重点分析java.lang.ThreadLocal.ThreadLocalMap#set方法

    private void set(ThreadLocal<?> key, Object value) {
    
              // We don't use a fast path as with get() because it is at
              // least as common to use set() to create new entries as
              // it is to replace existing ones, in which case, a fast
              // path would fail more often than not.
    
              Entry[] tab = table;
              int len = tab.length;
              int i = key.threadLocalHashCode & (len-1);//获取在数组中索引为止。可以认为就是我们日常所说的取模,区别是这个二进制运算更高效且数据更为分散碰撞几率低等特点(使用了斐波那契散列法)。
    
              for (Entry e = tab[i];//i索引位置初始值
                   e != null;//循环条件Entry不为null
                   e = tab[i = nextIndex(i, len)]) {//环型查找元素
                  ThreadLocal<?> k = e.get();
    
                  if (k == key) {//Entry数组中存在以这个ThreadLocal对象,替换它的值
                      e.value = value;
                      return;
                  }
    
                  if (k == null) {//该Entry的ThreadLocal被gc回收
                      replaceStaleEntry(key, value, i);
                      return;
                  }
              }
                        //不存在Entry,new一个对象
              tab[i] = new Entry(key, value);
              int sz = ++size;//统计值增加1
              if (!cleanSomeSlots(i, sz) && sz >= threshold)
                  rehash();
          }
    

    分析一下java.lang.ThreadLocal.ThreadLocalMap#replaceStaleEntry方法,该方法从staleSlot开始查找替换stale的值(被gc回收了的)

    private void replaceStaleEntry(ThreadLocal<?> key, Object value,
                                         int staleSlot) {
              Entry[] tab = table;
              int len = tab.length;
              Entry e;
    
              // Back up to check for prior stale entry in current run.
              // We clean out whole runs at a time to avoid continual
              // incremental rehashing due to garbage collector freeing
              // up refs in bunches (i.e., whenever the collector runs).
              int slotToExpunge = staleSlot;
              for (int i = prevIndex(staleSlot, len);//staleSlot索引位置开始向前查找
                   (e = tab[i]) != null;//循环查找到Entry不为null为止
                   i = prevIndex(i, len))//向前环型查找(反向)
                  if (e.get() == null)//查找Entry的key被gc回收的索引位置
                      slotToExpunge = i;
    
              // Find either the key or trailing null slot of run, whichever
              // occurs first
              for (int i = nextIndex(staleSlot, len);//初始值索引位置
                   (e = tab[i]) != null;//查找到Entry不为null为止
                   i = nextIndex(i, len)) {//环型向下查找索引(正向)
                  ThreadLocal<?> k = e.get();
    
                  // If we find key, then we need to swap it
                  // with the stale entry to maintain hash table order.
                  // The newly stale slot, or any other stale slot
                  // encountered above it, can then be sent to expungeStaleEntry
                  // to remove or rehash all of the other entries in run.
                  if (k == key) {//因为ThreadLocal通过线性探针方式,在存在hash冲突的时候查找下一个null位置插入元素,索引这里就存在交换会原谅未冲突的索引位置,提高后面的查找效率
                      e.value = value;
                      tab[i] = tab[staleSlot];
                      tab[staleSlot] = e;
    
                      // Start expunge at preceding stale entry if it exists
                      if (slotToExpunge == staleSlot)
                          slotToExpunge = i;
                      cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
                      return;
                  }
    
                  // If we didn't find stale entry on backward scan, the
                  // first stale entry seen while scanning for key is the
                  // first still present in the run.
                  if (k == null && slotToExpunge == staleSlot)
                      slotToExpunge = i;
              }
    
              // If key not found, put new entry in stale slot
              tab[staleSlot].value = null;//help gc
              tab[staleSlot] = new Entry(key, value);//新建一个Entry
    
              // If there are any other stale entries in run, expunge them
              if (slotToExpunge != staleSlot)
                  cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
          }
    

    下面看下java.lang.ThreadLocal.ThreadLocalMap#cleanSomeSlots方法,该方法主要通过启发式的查找过时(stale)的Entry,同时两种触发方式分别是:新增元素、有其他stale的Entry被清理的时候。扫描逻辑为:它执行对数数量的扫描,以平衡无扫描(快速但保留垃圾)和与元素数量成比例的扫描次数

    /**
           * Heuristically scan some cells looking for stale entries.
           * This is invoked when either a new element is added, or
           * another stale one has been expunged. It performs a
           * logarithmic number of scans, as a balance between no
           * scanning (fast but retains garbage) and a number of scans
           * proportional to number of elements, that would find all
           * garbage but would cause some insertions to take O(n) time.
           *
           * @param i a position known NOT to hold a stale entry. The
           * scan starts at the element after i.
           *
           * @param n scan control: {@code log2(n)} cells are scanned,
           * unless a stale entry is found, in which case
           * {@code log2(table.length)-1} additional cells are scanned.
           * When called from insertions, this parameter is the number
           * of elements, but when from replaceStaleEntry, it is the
           * table length. (Note: all this could be changed to be either
           * more or less aggressive by weighting n instead of just
           * using straight log n. But this version is simple, fast, and
           * seems to work well.)
           *
           * @return true if any stale entries have been removed.
           */
          private boolean cleanSomeSlots(int i, int n) {
              boolean removed = false;
              Entry[] tab = table;
              int len = tab.length;
              do {
                  i = nextIndex(i, len);
                  Entry e = tab[i];
                  if (e != null && e.get() == null) {
                      n = len;
                      removed = true;
                      i = expungeStaleEntry(i);
                  }
              } while ( (n >>>= 1) != 0);//对数
              return removed;
          }
    

    当前线程不存在ThreadLocalMap是调用java.lang.ThreadLocal#createMap

    void createMap(Thread t, T firstValue) {
          t.threadLocals = new ThreadLocalMap(this, firstValue);//new一个对象
      }
    

    这里关键看ThreadLocalMap构造函数代码如下:

     ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
              table = new Entry[INITIAL_CAPACITY];
              int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);//获取在数组中索引为止。可以认为就是我们日常所说的取模,区别是这个二进制运算更高效且数据更为分散碰撞几率低等特点(使用了斐波那契散列法)。
              table[i] = new Entry(firstKey, firstValue);
              size = 1;
              setThreshold(INITIAL_CAPACITY);//设置容量阀值
          }
    

    可以参考散列算法了解斐波那契散列法。

ThreadLocal彩蛋

InheritableThreadLocal

在一些特定的场景需要在当前线程开启另外一个线程同时当前现在的ThreadLocal中的ThreadLocal.ThreadLocalMap的值需要传递到被开启的线程中那么InheritableThreadLocal就是这个目的。在接下来的文章会分析这个类。

从早上10点左右开始整理和编写这篇文章确实写得有点累,但对于坚持在一线疫区的英雄们他们才是真正的英雄,向他们致敬!希望通过我写的这篇文章帮助到一些小伙伴,同时希望大家一起行动起来我们一起来打赢这场没有硝烟的战争。

关注我

在这里插入图片描述

0
青年IT男

个人从事金融行业,就职过易极付、思建科技等重庆一流技术团队,目前就职于某网约车平台负责整个支付系统建设。自身对金融行业有强烈的爱好。同时也实践大数据、数据存储、自动化集成和部署、分布式微服务、响应式编程、人工智能等领域。

评论已关闭。

This site is protected by wp-copyrightpro.com