Skip to content

实现一个简易的Handler

我们先要理解Handler整体的一个实现机制,才能更方便后续的理解源码。

Message

我们需要一个Message类来表示我们每个需要执行的任务。

java
public class Message {

    //唯一标识
    public int what;
    //数据(使用时再进行强转类型)
    public Object obj;
    //时间戳
    public long when;

}

MessageQueue

因为同时可能存在很多个Message,而这些Message不是在一个时间消费掉的,他有不同的时间戳(when),所以我们需要建立一个能存放这么许多个Message的容器,也就是MessageQueue,也就是消息队列。具体存放就是按照时间戳的先后顺序进行存放,这样在消费事件时只需要从头开始取出进行消费就可以了。

那么应该如何来存放这个消息呢?首先我们或许会想到数组。但是数组他不能动态申请内存大小,并且在进行插入时会移动大量的数据,显然是不合适的。

这时候,我们可以用链表进行存放,既不用移动数据,内存空间只需要按需申请就可以了,改变顺序只需要改变引用就好了,虽然随机访问的效率很慢,但是我们只需要取出表头的Message就可以了。

因此,我们为Message类新建一个指向下一个数据的字段:

java
public final class Message {
    //唯一标识
    public int what;
    //数据
    public Object obj;
    //时间戳
    public long when;
	//下一个节点
    public Message next;
}

其中,MessageQueue中的主要方法有两个:

  1. 插入message,因为可能存在多个线程向消息队列发送消息,所以内部还需要做一下线程同步。
  2. 获取表头的message。我们不断的取值,判断如果当前的消息能被处理(即时间戳<=当前时间),就直接返回就好了。反之,则计算需要等待的时间,进行休眠,等待一段时间后再重新唤醒。Tips:如果在休眠的过程中传入了一个需要被处理的消息,则需要做一下判断进行唤醒。思路理清楚了,代码也很好写出来了:
java
public class MessageQueue {

    //链表中的首条消息
    private Message mMessages;

    //向消息队列中插入数据
    void enqueueMessage(Message msg, long when) {
        synchronized (this) {
            //用于标记是否需要唤醒 next 方法
            boolean needWake = false;
            Message p = mMessages;
            //如果链表头为空或者时间比当前插入时间戳大,就将msg作为头部
            if(p == null || when == 0 || when < p.when) {
                msg.next = p;
                mMessages = msg;
                //需要唤醒
                needWake = mBlocked;
            } else {
                Message prev;
                //从链表头向链表尾遍历,寻找链表中第一条时间戳比 msg 大的消息,将 msg 插到该消息的前面
                for(;;) {
                    prev = p;
                    p = p.next;
                    if(p == null || when < p.when) {
                        break;
                    }
                }
                msg.next = p;
                prev.next = msg;
            }
            if(needWake) {
                //唤醒 next() 方法
                nativeWake();
            }
        }
    }

    private boolean mBlocked = false;

    //获取队列中的头消息
    Message next() {
        int nextPollTimeoutMills = 0;
        for(;;) {
            nativePollOnce(nextPollTimeoutMills);
            synchronized (this) {
                //获取当前时间
                final long now = SystemClock.uptimeMillis();
                Message msg = mMessages;
                if(msg != null) {
                    if(now < msg.when) {
                        //如果当前时间还未到达消息的的处理时间,那么就计算还需要等待的时间
                        nextPollTimeoutMills = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
                    } else {
                        //可以处理队头的消息了,第二条消息成为队头
                        mMessages = msg.next;
                        msg.next = null;
                        mBlocked = false;
                        return msg;
                    }
                } else {
                    nextPollTimeoutMills = -1;
                }
                mBlocked = true;
            }
        }
    }

    //休眠指定时间
    private void nativePollOnce(long nextPollTimeoutMillis) {

    }

    //唤醒 next() 方法
    private void nativeWake() {

    }

}

Handler

Handler的作用就是用于向消息队列发送消息。我们不仅想要他能发送Message消息,也能发送Runnable消息(通过包装成Message),能实现线程切换。

java
public class Message {

    //唯一标识
    public int what;
    //数据
    public Object obj;
    //时间戳
    public long when;
    //下一个节点
    public Message next;
    //将Runnable包装成Message
    public Runnable callback;
    //指向 Message 的发送者,同时也是 Message 的最终处理者
    public Handler target;

}

Handler类主要有发送Message和Runnable,分发消息和处理消息这几个方法。

java
public class Handler {

    private MessageQueue mQueue;

    public Handler(MessageQueue mQueue) {
        this.mQueue = mQueue;
    }

    public final void post(Runnable r) {
        sendMessageDelayed(getPostMessage(r), 0);
    }

    public final void postDelayed(Runnable r, long delayMillis) {
        sendMessageDelayed(getPostMessage(r), delayMillis);
    }

    public final void sendMessage(Message r) {
        sendMessageDelayed(r, 0);
    }

    public final void sendMessageDelayed(Message msg, long delayMillis) {
        if (delayMillis < 0) {
            delayMillis = 0;
        }
        sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis);
    }

    public void sendMessageAtTime(Message msg, long uptimeMillis) {
        msg.target = this;
        mQueue.enqueueMessage(msg, uptimeMillis);
    }

    private static Message getPostMessage(Runnable r) {
        Message m = new Message();
        m.callback = r;
        return m;
    }

    //由外部来重写该方法,以此来消费 Message
    public void handleMessage(Message msg) {

    }

    //用于分发消息
    public void dispatchMessage(Message msg) {
        if (msg.callback != null) {
            msg.callback.run();
        } else {
            handleMessage(msg);
        }
    }

}

在使用时,我们将子线程的Handler和主线程的消息队列绑定在一起,由主线程从MainMessageQueue中依次取出Message然后由Handler进行分发,就完成了线程的切换。

java
Handler handler = new Handler(mainThreadMessageQueue) {
    @Override
    public void handleMessage(Message msg) {
        switch (msg.what) {
            case 1: {
                String ob = (String) msg.obj;
                break;
            }
            case 2: {
                List<String> ob = (List<String>) msg.obj;
                break;
            }
        }
    }
};
Message messageA = new Message();
messageA.what = 1;
messageA.obj = "https://github.com/leavesCZY";
Message messageB = new Message();
messageB.what = 2;
messageB.obj = new ArrayList<String>();
handler.sendMessage(messageA);
handler.sendMessage(messageB);

Looper

那么如何在Handler中拿到主线程的消息队列并取出Message并回调Handler呢?就需要一个中转器——Looper。

java
public class Looper {
    final MessageQueue mQueue;

    final Thread mThread;

    private static Looper sMainLooper; //使用静态变量方便子线程获取

    static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>(); //为每一个线程维护一个Looper

    private Looper() {
        mQueue = new MessageQueue();
        mThread = Thread.currentThread();
    }

    //通过prepare来获取当前线程的Looper实例
    public static void prepare() {
        if(sThreadLocal.get() != null) {
            throw new RuntimeException("Only one Looper may be created per thread");
        }
        sThreadLocal.set(new Looper());
    }

    public static void prepareMainLooper() {
        prepare();
        synchronized (Looper.class) {
            if(sMainLooper != null) {
                throw new IllegalStateException("The main Looper has already been prepared.");
            }
            sMainLooper = myLooper();
        }
    }

    public static Looper getMainLooper() {
        synchronized (Looper.class) {
            return sMainLooper;
        }
    }

    public static Looper myLooper() {
        return sThreadLocal.get();
    }

    //循环从MessageQueue中取出Message,分发给Handler
    public static void loop() {
        final Looper me = new Looper();
        if(me == null) {
            throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
        }
        final MessageQueue queue = me.mQueue;
        for(;;) {
            Message msg = queue.next();
            msg.target.dispatchMessage(msg);
        }
    }
}

总结

  • Message:用来表示要执行的任务
  • Handler:子线程持有的 Handler 如果绑定到的是主线程的 MessageQueue 的话,那么子线程发送的 Message 就可以由主线程来消费,以此来实现线程切换,执行 UI 更新操作等目的
  • MessageQueue:即消息队列,通过 Handler 发送的消息并非都是立即执行的,需要先按照 Message 的优先级高低(延时时间的长短)保存到 MessageQueue 中,之后再来依次执行
  • Looper:Looper 用于从 MessageQueue 中循环获取 Message 并将之传递给消息处理者(即消息发送者 Handler 本身)来进行消费,每条 Message 都有个 target 变量用来指向 Handler,以此把 Message 和其处理者关联起来。不同线程之间通过互相拿到对方的 Looper 对象,以此来实现跨线程发送消息

Handler源码

Handler一共有七个构造方法,但目前使用的仅仅有两个,我们正常使用得方法是val handler = Handler(mainLooper),从这个方法点进去,发现构造方法如下:

java
    public Handler(@Nullable Callback callback, boolean async) {
        if (FIND_POTENTIAL_LEAKS) {
            final Class<? extends Handler> klass = getClass();
            if ((klass.isAnonymousClass() || klass.isMemberClass() || klass.isLocalClass()) &&
                    (klass.getModifiers() & Modifier.STATIC) == 0) {
                Log.w(TAG, "The following Handler class should be static or leaks might occur: " +
                    klass.getCanonicalName());
            }
        }

        mLooper = Looper.myLooper();
        if (mLooper == null) {
            throw new RuntimeException(
                "Can't create handler inside thread " + Thread.currentThread()
                        + " that has not called Looper.prepare()");
        }
        mQueue = mLooper.mQueue;
        mCallback = callback;
        mAsynchronous = async;
        mIsShared = false;
    }

Looper的初始化

在初始化Handler中,如果没有传入Looper的参数,那么就会调用Looper.prepare()来获取当前线程的Looper。如果Looper为null,那么就会抛出异常,所以我们在初始化Handler时一定要调用Looper.prepare()完成Looper的初始化。

在Looper类中,看到MyLooper()方法:

java
    public static @Nullable Looper myLooper() {
        return sThreadLocal.get();
    }

只是从sThreadLocal中获取了当前线程的Looper。sThreadLocal 又是通过 prepare(boolean) 方法来进行初始化赋值的,且只能赋值一次,重复调用将抛出异常。而ThreadLocal的特性是为每个线程维护一个单一的变量,所以不同的线程和Looper是一一对应的关系

java
    /** Initialize the current thread as a looper.
      * This gives you a chance to create handlers that then reference
      * this looper, before actually starting the loop. Be sure to call
      * {@link #loop()} after calling this method, and end it by calling
      * {@link #quit()}.
      */
    public static void prepare() {
        prepare(true);
    }

    private static void prepare(boolean quitAllowed) {
        if (sThreadLocal.get() != null) {
            throw new RuntimeException("Only one Looper may be created per thread");
        }
        sThreadLocal.set(new Looper(quitAllowed));
    }

Looper的构造方法:

java
    private Looper(boolean quitAllowed) {
        mQueue = new MessageQueue(quitAllowed);
        mThread = Thread.currentThread();
    }

我们发现这是一个私有方法,所以Looper和消息队列、线程都是一一对应的,初始化完成后就不能改变。

Handler发送消息

Handler用于发送消息的方法很多,但最终都会回到sendMessageAtTime(@NonNull Message msg, long uptimeMillis)方法:

java
    public boolean sendMessageAtTime(@NonNull Message msg, long uptimeMillis) {
        MessageQueue queue = mQueue;
        //如果队列为空直接抛出异常,因为最后消息会交给MessageQueue处理
        if (queue == null) {
            RuntimeException e = new RuntimeException(
                    this + " sendMessageAtTime() called with no mQueue");
            Log.w("Looper", e.getMessage(), e);
            return false;
        }
        return enqueueMessage(queue, msg, uptimeMillis);
    }
java
    private boolean enqueueMessage(@NonNull MessageQueue queue, @NonNull Message msg,
            long uptimeMillis) {
        msg.target = this; //将target指向本身Handler,因为最后这个消息还是要交给这个Handler本身进行处理
        msg.workSourceUid = ThreadLocalWorkSource.getUid();

        if (mAsynchronous) {
            msg.setAsynchronous(true);
        }
        return queue.enqueueMessage(msg, uptimeMillis); //交给消息队列处理
    }

MessageQueue

MessageQueue大概就与上面一致的了:

  • 因为存在多个线程同时往一个 MessageQueue 发送消息的可能,所以 enqueueMessage 内部肯定需要进行线程同步
  • 可以看出 MessageQueue 内部是以链表的结构来存储 Message 的(Message.next),根据 Message 的时间戳大小来决定其在消息队列中的位置
  • mMessages 代表的是消息队列中的第一条消息。如果 mMessages 为空(消息队列是空的),或者 mMessages 的时间戳要比新消息的时间戳大,则将新消息插入到消息队列的头部;如果 mMessages 不为空,则寻找消息列队中第一条触发时间比新消息晚的非空消息,将新消息插到该消息的前面
java
    boolean enqueueMessage(Message msg, long when) {
        if (msg.target == null) {
            throw new IllegalArgumentException("Message must have a target.");
        }

        synchronized (this) {
            if (msg.isInUse()) {
                throw new IllegalStateException(msg + " This message is already in use.");
            }

            if (mQuitting) {
                IllegalStateException e = new IllegalStateException(
                        msg.target + " sending message to a Handler on a dead thread");
                Log.w(TAG, e.getMessage(), e);
                msg.recycle();
                return false;
            }

            msg.markInUse();
            msg.when = when;
            Message p = mMessages;
            boolean needWake;
            if (p == null || when == 0 || when < p.when) {
                // New head, wake up the event queue if blocked.
                msg.next = p;
                mMessages = msg;
                needWake = mBlocked;
            } else {
                // Inserted within the middle of the queue.  Usually we don't have to wake
                // up the event queue unless there is a barrier at the head of the queue
                // and the message is the earliest asynchronous message in the queue.
                needWake = mBlocked && p.target == null && msg.isAsynchronous();
                Message prev;
                for (;;) {
                    prev = p;
                    p = p.next;
                    if (p == null || when < p.when) {
                        break;
                    }
                    if (needWake && p.isAsynchronous()) {
                        needWake = false;
                    }
                }
                msg.next = p; // invariant: p == prev.next
                prev.next = msg;
            }

            // We can assume mPtr != 0 because mQuitting is false.
            if (needWake) {
                nativeWake(mPtr);
            }
        }
        return true;
    }

MessageQueue通过next()方法读取消息并回调给Handler,通过无限循环判断当前时间有没有可以处理的消息,如果没有,那么就会导致Loop线程休眠直到条件满足后再重新遍历:

java
@UnsupportedAppUsage
Message next() {
    ···
    for (;;) {
        if (nextPollTimeoutMillis != 0) {
            Binder.flushPendingCommands();
        }

        //将 Loop 线程休眠挂起
        nativePollOnce(ptr, nextPollTimeoutMillis);

        synchronized (this) {
            // 尝试检索下一条消息。如果找到,返回。
            final long now = SystemClock.uptimeMillis();
            Message prevMsg = null;
            Message msg = mMessages;
            if (msg != null && msg.target == null) {
                // 被屏障阻挡的。查找队列中的下一个异步消息。
                do {
                    prevMsg = msg;
                    msg = msg.next;
                } while (msg != null && !msg.isAsynchronous());
            }
            if (msg != null) {
                if (now < msg.when) {
                    //队头消息还未到处理时间,计算需要等待的时间
                    nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
                } else {
                    // 获取到了消息
                    mBlocked = false;
                    if (prevMsg != null) {
                        prevMsg.next = msg.next;
                    } else {
                        mMessages = msg.next;
                    }
                    msg.next = null;
                    if (DEBUG) Log.v(TAG, "Returning message: " + msg);
                    msg.markInUse();
                    return msg;
                }
            } else {
                // No more messages.
                nextPollTimeoutMillis = -1;
            }
            ···
        }
        ···
        }
        ···
    }
}

next() 方法又是通过 Looper 类的 loop() 方法来循环调用的,loop() 方法内也是一个无限循环,唯一跳出循环的条件就是 queue.next()方法返回为 null。因为 next() 方法可能会触发阻塞操作,所以没有消息需要处理时也会导致 loop() 方法被阻塞着,而当 MessageQueue 有了新的消息,Looper 就会及时地处理这条消息并调用 msg.target.dispatchMessage(msg) 方法将消息回传给 Handler 进行处理。

java
/**
 * Run the message queue in this thread. Be sure to call
 * {@link #quit()} to end the loop.
 */
public static void loop() {
    final Looper me = myLooper();
    if (me == null) {
        throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
    }
    ···	
    for (;;) {
        Message msg = queue.next(); // might block
        if (msg == null) {
            // No message indicates that the message queue is quitting.
            return;
        }
        ···
        msg.target.dispatchMessage(msg);
        ···
    }
}

Handler的dispatchMessage(msg)方法就是在向下分发消息,机制如下:

  1. 如果该 Message 是通过 post(Runnable)等方法进行发送的,那么就直接回调该 Runnable 对象
  2. 如果在初始化 Handler 时传入了 Callback 对象,则优先交由其处理,如果 Callback 的 handleMessage 方法返回了 true,则结束流程
  3. 调用 Handler 的handleMessage 方法来处理消息,外部通过重写该方法来定义业务逻辑

源码如下:

java
    /**
     * Handle system messages here.
     */
    public void dispatchMessage(@NonNull Message msg) {
        if (msg.callback != null) {
            handleCallback(msg);
        } else {
            if (mCallback != null) {
                if (mCallback.handleMessage(msg)) {
                    return;
                }
            }
            handleMessage(msg);
        }
    }

    private static void handleCallback(Message message) {
        message.callback.run();
    }

    /**
     * Subclasses must implement this to receive messages.
     */
    public void handleMessage(@NonNull Message msg) {
    }

消息屏障

Handler中,为了让一些高优先级的异步消息能优先执行,采用了一种消息屏障的机制,可以发送一个屏障消息到MessageQueue中,当MessageQueue接受到这个屏障消息时,会停止当前所有的同步消息,往后遍历直到所有异步消息全部执行完。

Handler 的构造函数中的async参数就用于控制发送的 Message 是否属于异步消息。

java
    private boolean enqueueMessage(@NonNull MessageQueue queue, @NonNull Message msg,
            long uptimeMillis) {
        msg.target = this;
        msg.workSourceUid = ThreadLocalWorkSource.getUid();
        if (mAsynchronous) {
            //设为异步消息
            msg.setAsynchronous(true);
        }
        return queue.enqueueMessage(msg, uptimeMillis);
    }
java
@UnsupportedAppUsage
Message next() {
    ···
    for (;;) {
        if (nextPollTimeoutMillis != 0) {
            Binder.flushPendingCommands();
        }
        nativePollOnce(ptr, nextPollTimeoutMillis);
        synchronized (this) {
            // Try to retrieve the next message.  Return if found.
            final long now = SystemClock.uptimeMillis();
            Message prevMsg = null;
            Message msg = mMessages;
            if (msg != null && msg.target == null) { //target 为 null 即属于屏障消息
                // Stalled by a barrier.  Find the next asynchronous message in the queue.
                //循环遍历,找到屏障消息后面的第一条异步消息进行处理
                do {
                    prevMsg = msg;
                    msg = msg.next;
                } while (msg != null && !msg.isAsynchronous());
            }
            ···
        }
        ···
    }
}

我们可以通过调用 Message 对象的 setAsynchronous(boolean async) 方法来主动发送异步消息。而如果想要主动发送屏障消息的话,就需要通过反射来调用 MessageQueue 的 postSyncBarrier() 方法了,该方法被系统隐藏起来了。

退出Looper循环

所有子线程的Looper都支持退出Loop循环,即 quitAllowed 只有主线程才能是 false。

java
    private Looper(boolean quitAllowed) {
        mQueue = new MessageQueue(quitAllowed);
        mThread = Thread.currentThread();
    }

MessageQueue中退出的方法:

java
void quit(boolean safe) {
    if (!mQuitAllowed) {
        //MessageQueue 设置了不允许退出循环,直接抛出异常
        throw new IllegalStateException("Main thread not allowed to quit.");
    }
    synchronized (this) {
        if (mQuitting) {
            //避免重复调用
            return;
        }
        mQuitting = true;
        if (safe) {
            //只移除所有尚未执行的消息,不移除时间戳等于当前时间的消息
            removeAllFutureMessagesLocked();
        } else {
            //移除所有消息
            removeAllMessagesLocked();
        }
        // We can assume mPtr != 0 because mQuitting was previously false.
        nativeWake(mPtr);
    }
}

总结

  1. 每个 Handler 都会和一个 Looper 实例关联在一起,可以在初始化 Handler 时通过构造函数主动传入实例,否则就会默认使用和当前线程关联的 Looper 对象
  2. 每个 Looper 都会和一个 MessageQueue 实例关联在一起,每个线程都需要通过调用 Looper.prepare()方法来初始化本线程独有的 Looper 实例,并通过调用Looper.loop()方法来使得本线程循环向 MessageQueue 取出消息并执行。Android 系统默认会为每个应用初始化和主线程关联的 Looper 对象,并且默认就开启了 loop 循环来处理主线程消息
  3. MessageQueue 按照链接结构来保存 Message,执行时间早(即时间戳小)的 Message 会排在链表的头部,Looper 会循环从链表中取出 Message 并回调给 Handler,取值的过程可能会包含阻塞操作
  4. Message、Handler、Looper、MessageQueue 这四者就构成了一个生产者和消费者模式。Message 相当于产品,MessageQueue 相当于传输管道,Handler 相当于生产者,Looper 相当于消费者
  5. Handler 对于 Looper、Handler 对于 MessageQueue、Looper 对于 MessageQueue、Looper 对于 Thread ,这几个之间都是一一对应的关系,在关联后无法更改,但 Looper 对于 Handler、MessageQueue 对于 Handler 可以是一对多的关系
  6. Handler 能用于更新 UI 包含了一个隐性的前提条件:Handler 与主线程 Looper 关联在了一起。在主线程中初始化的 Handler 会默认与主线程 Looper 关联在一起,所以其 handleMessage(Message msg) 方法就会由主线程来调用。在子线程初始化的 Handler 如果也想执行 UI 更新操作的话,则需要主动获取 mainLooper 来初始化 Handler
  7. 对于我们自己在子线程中创建的 Looper,当不再需要的时候我们应该主动退出循环,否则子线程将一直无法得到释放。对于主线程 Loop 我们则不应该去主动退出,否则将导致应用崩溃