English | 简体中文 | 繁體中文 | Русский язык | Français | Español | Português | Deutsch | 日本語 | 한국어 | Italiano | بالعربية
消息都是存放在一个消息队列中去,而消息循环线程就是围绕这个消息队列进入一个无限循环的,直到线程退出。如果队列中有消息,消息循环线程就会把它取出来,并分发给相应的Handler进行处理;如果队列中没有消息,消息循环线程就会进入空闲等待状态,等待下一个消息的到来。在编写Android应用程序时,当程序执行的任务比较繁重时,为了不阻塞UI主线程而导致ANR的发生,我们通常的做法的创建一个子线程来完成特定的任务。在创建子线程时,有两种选择,一种通过创建Thread对象来创建一个无消息循环的子线程;还有一种就是创建一个带有消息循环的子线程,而创建带有消息循环的子线程由于两种实现方法,一种是直接利用Android给我们封装好的HandlerThread类来直接生成一个带有消息循环的线程对象,另一种方法是在实现线程的run()方法内使用以下方式启动一个消息循环:
一、消息机制使用
通常消息都是有一个消息线程和一个Handler组成,下面我们看PowerManagerService中的一个消息Handler:
mHandlerThread = new ServiceThread(TAG, Process.THREAD_PRIORITY_DISPLAY, false /*allowIo*/); mHandlerThread.start(); mHandler = new PowerManagerHandler(mHandlerThread.getLooper());
这里的ServiceThread就是一个HandlerThread,创建Handler的时候,必须把HandlerThread的looper传进去,否则就是默认当前线程的looper。
而每个handler,大致如下:
private final class PowerManagerHandler extends Handler { public PowerManagerHandler(Looper looper) { super(looper, null, true /*async*/); } @Override public void handleMessage(Message msg) { switch (msg.what) { case MSG_USER_ACTIVITY_TIMEOUT: handleUserActivityTimeout(); break; case MSG_SANDMAN: handleSandman(); break; case MSG_SCREEN_BRIGHTNESS_BOOST_TIMEOUT: handleScreenBrightnessBoostTimeout(); break; case MSG_CHECK_WAKE_LOCK_ACQUIRE_TIMEOUT: checkWakeLockAquireTooLong(); Message m = mHandler.obtainMessage(MSG_CHECK_WAKE_LOCK_ACQUIRE_TIMEOUT); m.setAsynchronous(true); mHandler.sendMessageDelayed(m, WAKE_LOCK_ACQUIRE_TOO_LONG_TIMEOUT); break; } } }
두 번째, 메시지 메커니즘 원리
그래서 먼저 HandlerThread의 메인 함수 run 함수를 보겠습니다:
public void run() { mTid = Process.myTid(); Looper.prepare(); synchronized (this) { mLooper = Looper.myLooper();//할당 후 notifyall을 호출하는 것은 getLooper 함수가 mLooper를 반환하기 때문입니다 notifyAll(); } Process.setThreadPriority(mPriority); onLooperPrepared(); Looper.loop(); mTid = -1; }
그런 다음 Lopper의 prepare 함수를 다시 보겠습니다. 마지막으로 Looper 객체를 새로 생성하고 스레드의 지역 변수에 저장합니다.
public static void prepare() { prepare(true); } private static void prepare(boolean quitAllowed) { if (sThreadLocal.get() != null) { throw new RuntimeException("한 스레드당 Looper가 하나만 생성될 수 있습니다"); } sThreadLocal.set(new Looper(quitAllowed)); }
Looper의 생성자 함수에서 MessageQueue가 생성됩니다
private Looper(boolean quitAllowed) { mQueue = new MessageQueue(quitAllowed); mThread = Thread.currentThread(); }
MessageQueue의 생성자를 다시 보겠습니다. nativeInit는 native 메서드이며, 반환 값을 mPtr에 저장합니다. 이는 long형 변수로 저장된 포인터입니다.
MessageQueue(boolean quitAllowed) { mQuitAllowed = quitAllowed; mPtr = nativeInit(); }
native 함수는 주로 NativeMessageQueue 객체를 생성하고 포인터 변수를 반환합니다.
static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) { NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue(); if (!nativeMessageQueue) { jniThrowRuntimeException(env, "Unable to allocate native queue"); return 0; } nativeMessageQueue->incStrong(env); return reinterpret_cast<jlong>(nativeMessageQueue); }
NativeMessageQueue 생성자는 mLooper를 가져오며, 없다면 새로운 Looper를 생성합니다.
NativeMessageQueue::NativeMessageQueue() : mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) { mLooper = Looper::getForThread(); if (mLooper == NULL) { mLooper = new Looper(false); Looper::setForThread(mLooper); } }
그런 다음 Looper의 생성자를 다시 보겠습니다. eventfd를 호출하여 fd를 생성하는 것을 보여줍니다. eventfd는 주로 프로세스나 스레드 간의 통신에 사용됩니다. 이 블로그에서 eventfd 소개를 보세요.
Looper::Looper(bool allowNonCallbacks) : mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false), mPolling(false), mEpollFd(-1}, mEpollRebuildRequired(false), mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) { mWakeEventFd = eventfd(0, EFD_NONBLOCK); LOG_ALWAYS_FATAL_IF(mWakeEventFd < 0, "wake 이벤트 fd 생성 실패. errno=%d", errno); AutoMutex _l(mLock); rebuildEpollLocked(); }
2.1 c 층에서 epoll을 생성합니다
rebuildEpollLocked 함수를 다시 보겠습니다. epoll을 생성하고 mWakeEventFd를 epoll에 추가하고 mRequests의 fd도 epoll에 추가합니다
void Looper::rebuildEpollLocked() { // 기존 epoll 인스턴스가 있으면 닫습니다. if (mEpollFd >= 0) { #if DEBUG_CALLBACKS ALOGD("%p ~ rebuildEpollLocked - rebuilding epoll set", this); #endif close(mEpollFd); } // 새 epoll 인스턴스를 할당하고 깨우기 파이프를 등록합니다. mEpollFd = epoll_create(EPOLL_SIZE_HINT); LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "epoll 인스턴스 생성 실패. errno=%d", errno); struct epoll_event eventItem; memset(&eventItem, 0, sizeof(epoll_event)); // 데이터 필드 유니온의 미사용 멤버를 모두 0으로 초기화합니다 eventItem.events = EPOLLIN; eventItem.data.fd = mWakeEventFd; int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem); LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance. errno=%d", errno); for (size_t i = 0; i < mRequests.size(); i++) { const Request& request = mRequests.valueAt(i); struct epoll_event eventItem; request.initEventItem(&eventItem); int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem); if (epollResult < 0) { ALOGE("Error adding epoll events for fd %d while rebuilding epoll set, errno=%d", request.fd, errno); } } }
HandlerThread의 run 함수로 돌아가 Looper의 loop 함수를 분석을 계속합니다
public void run() { mTid = Process.myTid(); Looper.prepare(); synchronized (this) { mLooper = Looper.myLooper(); notifyAll(); } Process.setThreadPriority(mPriority); onLooperPrepared(); Looper.loop(); mTid = -1; }
Looper의 loop 함수를 확인해 보겠습니다:
public static void loop() { final Looper me = myLooper(); if (me == null) { throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread."); } final MessageQueue queue = me.mQueue;//Looper의 mQueue를 얻습니다 // 이 스레드의 식별자가 로컬 프로세스의 식별자인지 확인하세요. // 그 실제 식별 토큰이 무엇인지 추적하세요. Binder.clearCallingIdentity(); final long ident = Binder.clearCallingIdentity(); for (;;) { Message msg = queue.next(); // might block 이 함수는 블록할 수 있으며, 블록은 epoll_wait입니다. if (msg == null) { // 메시지가 없으면 메시지 큐가 종료 중인 것을 나타냅니다 return; } // 이는 로거가 UI 이벤트를 설정하는 경우 로컬 변수에 있어야 해야 합니다 Printer logging = me.mLogging;//자신이 찍은 출력 if (logging != null) { logging.println(">>>>> Dispatching to " + msg.target + " " + msg.callback + ": " + msg.what); } msg.target.dispatchMessage(msg); if (logging != null) { logging.println("<<<<< Finished to " + msg.target + " " + msg.callback); } // 배포 중에 주의해야 할 것은 // thread의 identity가 손상되지 않았습니다. final long newIdent = Binder.clearCallingIdentity(); if (ident != newIdent) { Log.wtf(TAG, "Thread identity changed from 0x" + Long.toHexString(ident) + " to 0x" + Long.toHexString(newIdent) + " dispatching to " + msg.target.getClass().getName() + " " + msg.callback + " what=" + msg.what); } msg.recycleUnchecked(); } }
MessageQueue 클래스의 next 함수는 주로 nativePollOnce 함수를 호출하고, 그 다음 메시지 큐에서 Message를 꺼냅니다.
Message next() { // 메시지 루프가 이미 quit되고 처리되었으면 여기서 돌아가십시오. // 애플리케이션이 quit 후 looper를 다시 시작하려고 시도할 때 이러한 일이 발생할 수 있습니다. // 지원되지 않습니다. final long ptr = mPtr;//이전에 보존된 포인터 if (ptr == 0) { return null; } int pendingIdleHandlerCount = -1; // -1 제1회 반복 중에만 int nextPollTimeoutMillis = 0; for (;;) { if (nextPollTimeoutMillis != 0) { Binder.flushPendingCommands(); } nativePollOnce(ptr, nextPollTimeoutMillis);
아래에서 nativePollOnce 이라는 native 함수를 주로 살펴보겠습니다. 이전 포인터를 NativeMessageQueue로 강제 변환한 후 pollOnce 함수를 호출합니다.
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj, jlong ptr, jint timeoutMillis) { NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr); nativeMessageQueue->pollOnce(env, obj, timeoutMillis); }
2.2 c 레이어 epoll_wait 블록
pollOnce 함수는, 이 함수 앞의 while 문이 일반적으로 없으며 indent가 0보다 큰 경우를 처리하는 것만 있으며, 이러한 경우는 일반적으로 없기 때문에 pollInner 함수를 바로 보면 됩니다.
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) { int result = 0; for (;;) { while (mResponseIndex < mResponses.size()) { const Response& response = mResponses.itemAt(mResponseIndex++); int ident = response.request.ident; if (ident >= 0) { int fd = response.request.fd; int events = response.events; void* data = response.request.data; #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce"} - returning signalled identifier %d: " "fd=%d, events=0x%x, data=%p", this, ident, fd, events, data); #endif if (outFd != NULL) *outFd = fd; if (outEvents != NULL) *outEvents = events; if (outData != NULL) *outData = data; return ident; } } if (result != 0) { #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce"} - returning result %d", this, result); #endif if (outFd != NULL) *outFd = 0; if (outEvents != NULL) *outEvents = 0; if (outData != NULL) *outData = NULL; return result; } result = pollInner(timeoutMillis); } }
pollInner 함수는 주로 epoll_wait 블록을 호출하며, java 레이어는 매번 블록 시간을 c 레이어에 전달하여, mWakeEventFd 또는 이전에 addFd로 추가된 fd에 이벤트가 오면 epoll_wait가 반환됩니다.
int Looper::pollInner(int timeoutMillis) { #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce"} - 대기 중: timeoutMillis=%d", this, timeoutMillis); #endif // 다음 메시지가 언제 오기를 기반으로 타임아웃을 조정하세요. if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) { nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC); int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime); if (messageTimeoutMillis >= 0 && (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) { timeoutMillis = messageTimeoutMillis; } #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce"} - next message in %" PRId64 "ns, adjusted timeout: timeoutMillis=%d", this, mNextMessageUptime - now, timeoutMillis); #endif } // Poll. int result = POLL_WAKE; mResponses.clear();//Empty mResponses mResponseIndex = 0; // We are about to idle. mPolling = true; struct epoll_event eventItems[EPOLL_MAX_EVENTS]; int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);//epoll_wait main thread is blocked here, and the blocking time is also passed from the java layer // No longer idling. mPolling = false; // Acquire lock. mLock.lock(); // Rebuild epoll set if needed. if (mEpollRebuildRequired) { mEpollRebuildRequired = false; rebuildEpollLocked(); goto Done; } // Check for poll error. if (eventCount < 0) { if (errno == EINTR) { goto Done; } ALOGW("Poll failed with an unexpected error, errno=%d", errno); result = POLL_ERROR; goto Done; } // poll timeout을 확인합니다. if (eventCount == 0) { #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce"} - timeout", this); #endif result = POLL_TIMEOUT; goto Done; } // 모든 이벤트를 처리합니다. #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce"} - handling events from %d fds", this, eventCount); #endif for (int i = 0; i < eventCount; i++) { int fd = eventItems[i].data.fd; uint32_t epollEvents = eventItems[i].events; if (fd == mWakeEventFd) {//��음을 알리는 이벤트를 통지합니다 if (epollEvents & EPOLLIN) { awoken(); } else { ALOGW("wake event fd에서 예상치 못한 epoll 이벤트 0x%x를 무시합니다.", epollEvents); } } else { ssize_t requestIndex = mRequests.indexOfKey(fd);//이전 addFd 이벤트 if (requestIndex >= 0) { int events = 0; if (epollEvents & EPOLLIN) events |= EVENT_INPUT; if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT; if (epollEvents & EPOLLERR) events |= EVENT_ERROR; if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP; pushResponse(events, mRequests.valueAt(requestIndex));//mResponses에 저장됩니다 } else { ALOGW("Unexpected epoll events 0x%x on fd %d that is " "no longer registered.", epollEvents, fd); } } } Done: ; // 대기 중인 메시지 callbacks를 호출합니다. mNextMessageUptime = LLONG_MAX; while (mMessageEnvelopes.size() != 0) {// 이 부분은 C 레이어의 메시지이며, Java 레이어의 메시지는 자신이 관리합니다 nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC); const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0); if (messageEnvelope.uptime <= now) { // 에너지를 목록에서 제거합니다. // handleMessage 호출까지 handler에 강한 참조를 유지합니다 // 끝낼 때까지 기다립니다. 그런 다음 handler를 지우기 위해 떨어뜨립니다 *전에* // 우리의 락을 다시 얻습니다. { // handler를 얻습니다 sp<MessageHandler> handler = messageEnvelope.handler; Message message = messageEnvelope.message; mMessageEnvelopes.removeAt(0); mSendingMessage = true; mLock.unlock(); #if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS ALOGD("%p ~ pollOnce"} - sending message: handler=%p, what=%d", this, handler.get(), message.what); #endif handler->handleMessage(message); } // handler를 해제합니다 mLock.lock(); mSendingMessage = false; result = POLL_CALLBACK; } else { // 큐의 머리에 남아 있는 마지막 메시지가 다음 깨우기 시간을 결정합니다. mNextMessageUptime = messageEnvelope.uptime; break; } } // 락을 해제합니다. mLock.unlock(); // 모든 response callbacks를 호출합니다. for (size_t i = 0; i < mResponses.size(); i++) {//이전 addFd 이벤트의 처리입니다. 주로 mResponses를 순회하며 그回调을 호출합니다 Response& response = mResponses.editItemAt(i); if (response.request.ident == POLL_CALLBACK) { int fd = response.request.fd; int events = response.events; void* data = response.request.data; #if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS ALOGD("%p ~ pollOnce"} - invoking fd event callback %p: fd=%d, events=0x%x, data=%p", this, response.request.callback.get(), fd, events, data); #endif // callback을 호출합니다. 파일 디스크립터가 닫혀질 수 있으므로 주의하세요. // callback 함수(그리고 가능한 경우 재사용된)를 호출합니다. // 기능이 반환되기 전에 파일 디스크립터를 제거할 때 조심스럽게 해야 합니다. int callbackResult = response.request.callback->handleEvent(fd, events, data); if (callbackResult == 0) { removeFd(fd, response.request.seq); } // 우리가 이제 response 구조체에서 callback 참조를 즉시 clear해야 합니다. // 다음 poll 이전에 자신의 response vector를 clear하지 않습니다. response.request.callback.clear(); result = POLL_CALLBACK; } } return result; }
Looper의 loop 함수를 계속 분석하면 코드를 디버깅하기 위해 자신의 출력을 추가할 수 있습니다. 이전에는 Message의 target의 dispatchMessage를 호출하여 메시지를 배분했습니다
for (;;) { Message msg = queue.next(); // -blocking 될 수 있습니다 if (msg == null) { // 메시지가 없으면 메시지 큐가 종료 중인 것을 나타냅니다 return; } // 이는 로거가 UI 이벤트를 설정하는 경우 로컬 변수에 있어야 해야 합니다 Printer logging = me.mLogging;//자신의 출력 if (logging != null) { logging.println(">>>>> Dispatching to " + msg.target + " " + msg.callback + ": " + msg.what); } msg.target.dispatchMessage(msg); if (logging != null) { logging.println("<<<<< Finished to " + msg.target + " " + msg.callback); } // 배포 중에 주의해야 할 것은 // thread의 identity가 손상되지 않았습니다. final long newIdent = Binder.clearCallingIdentity(); if (ident != newIdent) { Log.wtf(TAG, "Thread identity changed from 0x" + Long.toHexString(ident) + " to 0x" + Long.toHexString(newIdent) + " dispatching to " + msg.target.getClass().getName() + " " + msg.callback + " what=" + msg.what); } msg.recycleUnchecked(); } }
2.3 디버깅 출력을 추가합니다
우리는 자신이 추가한 출력을 보면, Lopper의 setMessageLogging 함수를 통해 출력할 수 있습니다.
public void setMessageLogging(@Nullable Printer printer) { mLogging = printer; } Printer는 인터페이스입니다 public interface Printer { /** * 텍스트 행을 출력에 작성합니다. 종료할 필요가 없습니다. * 주어진 문자열에 줄 바꿈을 추가합니다. */ void println(String x); }
2.4 Java 레이어 메시지 배포 처리
메시지의 배포를 다시 보면, 먼저 Handler의 obtainMessage 함수를 호출합니다.
Message msg = mHandler.obtainMessage(MSG_CHECK_WAKE_LOCK_ACQUIRE_TIMEOUT); msg.setAsynchronous(true); mHandler.sendMessageDelayed(msg, WAKE_LOCK_ACQUIRE_TOO_LONG_TIMEOUT);
먼저 obtainMessage가 Message의 obtain 함수를 호출한 것을 보겠습니다
public final Message obtainMessage(int what) { return Message.obtain(this, what); }
Message의 obtain 함수는 새로운 Message를 생성한 후, 그 target을 Handler로 설정합니다
public static Message obtain(Handler h, int what) { Message m = obtain();//Message를 새로 생성합니다 m.target = h; m.what = what; return m; }
이제 이전에 분배한 메시지와 연결해보겠습니다
msg.target.dispatchMessage(msg); 마지막으로 Handler의 dispatchMessage 함수를 호출하며, Handler에서는 다양한 상황에 따라 메시지를 처리합니다.
public void dispatchMessage(Message msg) { if (msg.callback != null) { handleCallback(msg);//이 경우는 post 형식으로 전송하며 Runnable을 포함한 경우입니다 } else { if (mCallback != null) {//이 경우는 handler가 매개변수로 mCallback을 전달한 경우입니다 if (mCallback.handleMessage(msg)) { return; } } handleMessage(msg);//마지막으로는 자신이 구현한 handleMessage 처리에 도달합니다 } }
2.3 java 레이어 메시지 전송
우리가 java 레이어의 메시지 전송을 다시 보면, 주로 Handler의 sendMessage post와 같은 함수를 호출하며, 결국 아래의 함수를 호출합니다.
public boolean sendMessageAtTime(Message msg, long uptimeMillis) { MessageQueue queue = mQueue; if (queue == null) { RuntimeException e = new RuntimeException(" this + " sendMessageAtTime() called with no mQueue"); Log.w("Looper", e.getMessage(), e); return false; } return enqueueMessage(queue, msg, uptimeMillis); }
Let's take a look at the java layer, which ultimately calls the enqueueMessage function to send messages
private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) { msg.target = this; if (mAsynchronous) { msg.setAsynchronous(true); } return queue.enqueueMessage(msg, uptimeMillis); }
Finally, in enqueueMessage, add the message to the message queue and then call the nativeWake function of the C layer if necessary
boolean enqueueMessage(Message msg, long when) { if (msg.target == null) { throw new IllegalArgumentException("Message must have a target."); } if (msg.isInUse()) { throw new IllegalStateException(msg + " This message is already in use."); } synchronized (this) { if (mQuitting) { IllegalStateException e = new IllegalStateException(" msg.target + " sending message to a Handler on a dead thread"); Log.w(TAG, e.getMessage(), e); msg.recycle(); return false; } msg.markInUse(); msg.when = when; Message p = mMessages; boolean needWake; if (p == null || when == 0 || when < p.when) { // 새로운 머리, 차단된 경우 이벤트 큐를 깨웁니다. msg.next = p; mMessages = msg; needWake = mBlocked; } else { // 큐의 중간에 삽입됩니다. 일반적으로 깨우지 않아도 됩니다. // 이벤트 큐의 머리에 바리어가 없다면 이벤트 큐를 올립니다. // 그리고 메시지는 큐에서 가장 빠른 비동기 메시지입니다. needWake = mBlocked && p.target == null && msg.isAsynchronous(); Message prev; for (;;) { prev = p; p = p.next; if (p == null || when < p.when) { break; } if (needWake && p.isAsynchronous()) { needWake = false; } } msg.next = p; // invariant: p == prev.next prev.next = msg; } // mQuitting이 false라면 mPtr != 0라고 가정할 수 있습니다. if (needWake) { nativeWake(mPtr); } } return true; }
저희가 이 native 메서드를 보면, 마지막에도 Looper의 wake 함수를 호출합니다.
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) { NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr); nativeMessageQueue->wake();}} } void NativeMessageQueue::wake() { mLooper->wake();}} }
Looper 클래스의 wake 함수는 mWakeEventfd에 내용을 써서, 이 fd는 단순히 알림일 뿐입니다. pipe와 유사합니다. 마지막으로 epoll_wait를 깨우고, 스레드가 블록하지 않도록 합니다. 먼저 c 계층 메시지를 보내고, 이전에 addFd에 추가된 이벤트를 처리한 후, java 계층 메시지를 처리합니다.
void Looper::wake() { #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ wake", this); #endif uint64_t inc = 1; ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t))); if (nWrite != sizeof(uint64_t)) { if (errno != EAGAIN) { ALOGW("Could not write wake signal, errno=%d", errno); } } }
2.4 C 계층 메시지 전송
C 계층에서도 메시지를 보낼 수 있습니다. 주로 Looper의 sendMessageAtTime 함수를 호출하며, handler는 콜백입니다. 메시지를 mMessageEnvelopes에 넣습니다.
void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler, const Message& message) { #if DEBUG_CALLBACKS ALOGD("%p ~ sendMessageAtTime - uptime=%" PRId64 ", handler=%p, what=%d", this, uptime, handler.get(), message.what); #endif size_t i = 0; { // 락 획득 AutoMutex _l(mLock); size_t messageCount = mMessageEnvelopes.size(); while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) { i += 1; } MessageEnvelope messageEnvelope(uptime, handler, message); mMessageEnvelopes.insertAt(messageEnvelope, i, 1); // Optimization: Looper가 현재 메시지를 보내고 있다면, 우리는 건너뛰고 // wake() 호출을 통해 Looper가 처리한 다음에 할 다음 작업을 결정합니다. // messages는 다음 깨어날 시간을 결정하는 데 사용됩니다. 실제로는 그렇습니다 // 이 코드가 Looper 스레드에서 실행되는지 여부에도 관계없이. if (mSendingMessage) { return; } } // lock 해제 // 뉴 메시지를 머리에 추가할 때까지 poll 루프를 깨우지 않습니다. if (i == 0) { wake(); } }
pollOnce에서 epoll_wait 후, mMessageEnvelopes에 있는 메시지를 순회하며 handler의 handleMessage 함수를 호출합니다
while (mMessageEnvelopes.size() != 0) { nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC); const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0); if (messageEnvelope.uptime <= now) { // 에너지를 목록에서 제거합니다. // handleMessage 호출까지 handler에 강한 참조를 유지합니다 // 끝낼 때까지 기다립니다. 그런 다음 handler를 지우기 위해 떨어뜨립니다 *전에* // 우리의 락을 다시 얻습니다. { // handler를 얻습니다 sp<MessageHandler> handler = messageEnvelope.handler; Message message = messageEnvelope.message; mMessageEnvelopes.removeAt(0); mSendingMessage = true; mLock.unlock(); #if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS ALOGD("%p ~ pollOnce"} - sending message: handler=%p, what=%d", this, handler.get(), message.what); #endif handler->handleMessage(message); } // handler를 해제합니다 mLock.lock(); mSendingMessage = false; result = POLL_CALLBACK; } else { // 큐의 머리에 남아 있는 마지막 메시지가 다음 깨우기 시간을 결정합니다. mNextMessageUptime = messageEnvelope.uptime; break; } }
Looper_test.cpp 파일이 있습니다. 그 안에는 Looper의 많은 사용 방법이 설명되어 있습니다. 확인해 보겠습니다
sp<StubMessageHandler> handler = new StubMessageHandler(); mLooper->sendMessageAtTime(now + ms2ns(100), handler, Message(MSG_TEST1)); StubMessageHandler가 MessageHandler를 상속받으면 handleMessage 메서드를 구현해야 합니다 class StubMessageHandler : public MessageHandler { public: Vector<Message> messages; virtual void handleMessage(const Message& message) { messages.push(message); } };
Message와 MessageHandler 클래스를顺便 확인해 보겠습니다
struct Message { Message() : what(0) { } Message(int what) : what(what) { } /* 메시지 유형. (해석은 핸들러에 맡깁니다) */ int what; }; /** * Looper 메시지 핸들러 인터페이스. * * Looper는 메시지 핸들러에 강한 참조를 유지할 때마다 * handler에 전달할 메시지. Looper::removeMessages 호출을 보장하도록 하세요 * handler로 향하는 임시 메시지를 제거하여 handler * 삭제될 수 있습니다. */ class MessageHandler : public virtual RefBase { protected: virtual ~MessageHandler() { } public: /** * 메시지를 처리합니다. */ virtual void handleMessage(const Message& message) = 0; };
2.5 c층 addFd
Looper.cpp의 addFd에서 fd를 스레드 epoll에 추가할 수도 있습니다. fd에 데이터가 오면 이에 대한 데이터를 처리할 수도 있습니다. 먼저 addFd 함수를 보겠습니다. callBack回调을 주목해야 합니다.
int Looper::addFd(int fd, int ident, int events, Looper_callbackFunc callback, void* data) { return addFd(fd, ident, events, callback ? new SimpleLooperCallback(callback) : NULL, data); } int Looper::addFd(int fd, int 인식자, int events, const sp<LooperCallback>& 콜백, void* data) { #if DEBUG_CALLBACKS ALOGD("%p ~ addFd - fd=%d, 인식자=%d, events=0x%x, 콜백=%p, 데이터=%p", 이, fd, 인식자, events, callback.get(), data); #endif if (!callback.get()) { if (! mAllowNonCallbacks) { ALOGE("NULL 콜백 설정 시도는 허용되지 않지만 이 루퍼에서는 허용됩니다."); return -1; } if (인식자 < 0) { ALOGE("NULL 콜백 설정 시도가 무효입니다. < 0 인식자와 함께."); return -1; } } else { ident = POLL_CALLBACK; } { // 락 획득 AutoMutex _l(mLock); Request request; request.fd = fd; request.ident = ident; request.events = events; request.seq = mNextRequestSeq++; request.callback = callback; request.data = data; if (mNextRequestSeq = -1) mNextRequestSeq = 0; // 시퀀스 번호 예약 -1 struct epoll_event eventItem; request.initEventItem(&eventItem); ssize_t requestIndex = mRequests.indexOfKey(fd); if (requestIndex < 0) { int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem);//epoll에 추가 if (epollResult < 0) { ALOGE("fd %d의 epoll 이벤트 추가에 오류, errno=%d", fd, errno); return -1; } mRequests.add(fd, request);//mRequests에 넣기 } else { int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_MOD, fd, & eventItem);//업데이트 if (epollResult < 0) { if (errno == ENOENT) { // ENOENT를 용서하십시오. 이는 이전 파일 디스크리터가 // callback가 등록되기 전에 닫혔고 동시에 새로운 // 같은 번호의 파일 디스크리터가 생성되었으며 지금 // 처음 등록되는 중입니다. 이 오류는 자연스럽게 발생할 수 있습니다. // when a callback has the side-effect of closing the file descriptor // before returning and unregistering itself. Callback sequence number // checks further ensure that the race is benign. // // Unfortunately due to kernel limitations we need to rebuild the epoll // set from scratch because it may contain an old file handle that we are // now unable to remove since its file descriptor is no longer valid. // No such problem would have occurred if we were using the poll system // call instead, but that approach carries others disadvantages. #if DEBUG_CALLBACKS ALOGD("%p ~ addFd - EPOLL_CTL_MOD failed due to file descriptor " "being recycled, falling back on EPOLL_CTL_ADD, errno=%d", this, errno); #endif epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem); if (epollResult < 0) { ALOGE("Error modifying or adding epoll events for fd %d, errno=%d", fd, errno); return -1; } scheduleEpollRebuildLocked(); } else { ALOGE("Error modifying epoll events for fd %d, errno=%d", fd, errno); return -1; } } mRequests.replaceValueAt(requestIndex, request); } } // lock 해제 return 1; }
pollOnce 함수에서는 먼저 mRequests에 있는 일치하는 fd를 찾고, 그 다음 Response를 새로 생성하고 Response와 Request를 매칭시킵니다.
} else { ssize_t requestIndex = mRequests.indexOfKey(fd); if (requestIndex >= 0) { int events = 0; if (epollEvents & EPOLLIN) events |= EVENT_INPUT; if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT; if (epollEvents & EPOLLERR) events |= EVENT_ERROR; if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP; pushResponse(events, mRequests.valueAt(requestIndex)); } else { ALOGW("Unexpected epoll events 0x%x on fd %d that is " "no longer registered.", epollEvents, fd); } }
아래에서는 mResponses에 있는 Response를 순회하고 그 request의 콜백을 호출합니다.
for (size_t i = 0; i < mResponses.size(); i++) { Response& response = mResponses.editItemAt(i); if (response.request.ident == POLL_CALLBACK) { int fd = response.request.fd; int events = response.events; void* data = response.request.data; #if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS ALOGD("%p ~ pollOnce"} - invoking fd event callback %p: fd=%d, events=0x%x, data=%p", this, response.request.callback.get(), fd, events, data); #endif // callback을 호출합니다. 파일 디스크립터가 닫혀질 수 있으므로 주의하세요. // callback 함수(그리고 가능한 경우 재사용된)를 호출합니다. // 기능이 반환되기 전에 파일 디스크립터를 제거할 때 조심스럽게 해야 합니다. int callbackResult = response.request.callback->handleEvent(fd, events, data); if (callbackResult == 0) { removeFd(fd, response.request.seq); } // 우리가 이제 response 구조체에서 callback 참조를 즉시 clear해야 합니다. // 다음 poll 이전에 자신의 response vector를 clear하지 않습니다. response.request.callback.clear(); result = POLL_CALLBACK; } }
그런 다음 Looper_test.cpp가 어떻게 사용되는지 또 보겠습니다?
Pipe pipe; StubCallbackHandler handler(true); handler.setCallback(mLooper, pipe.receiveFd, Looper::EVENT_INPUT);
우리가 handler의 setCallback 함수를 보겠습니다.
class CallbackHandler { public: void setCallback(const sp<Looper>& looper, int fd, int events) { looper->addFd(fd, 0, events, staticHandler, this);//就是调用了looper的addFd函数,并且回调 } protected: virtual ~CallbackHandler() { } virtual int handler(int fd, int events) = 0; private: static int staticHandler(int fd, int events, void* data) {//这个就是回调函数 return static_cast<CallbackHandler*>(data)->handler(fd, events); } }; class StubCallbackHandler : public CallbackHandler { public: int nextResult; int callbackCount; int fd; int events; StubCallbackHandler(int nextResult) : nextResult(nextResult), callbackCount(0), fd(-1), events(-1) { } protected: virtual int handler(int fd, int events) {//这个是通过回调函数再调到这里的 callbackCount += 1; this->fd = fd; this->events = events; return nextResult; } };
我们结合Looper的addFd一起来看,当callback是有的,我们新建一个SimpleLooperCallback
int Looper::addFd(int fd, int ident, int events, Looper_callbackFunc callback, void* data) { return addFd(fd, ident, events, callback ? new SimpleLooperCallback(callback) : NULL, data); }
这里的Looper_callbackFunc是一个typedef
typedef int (*Looper_callbackFunc)(int fd, int events, void* data);
그런 다음 SimpleLooperCallback을 다시 보겠습니다.
class SimpleLooperCallback : public LooperCallback { protected: virtual ~SimpleLooperCallback(); public: SimpleLooperCallback(Looper_callbackFunc callback); virtual int handleEvent(int fd, int events, void* data); private: Looper_callbackFunc mCallback; };SimpleLooperCallback::SimpleLooperCallback(Looper_callbackFunc callback) : mCallback(callback) { } SimpleLooperCallback::~SimpleLooperCallback() { } int SimpleLooperCallback::handleEvent(int fd, int events, void* data) { return mCallback(fd, events, data); }
마지막으로 callback을 호출합니다->handleEvent(fd, events, data)를 호출했으며, callback은 SimpleLooperCallback입니다. 여기서 data는 이전에 전달된 CallbackHandler의 this 포인터입니다
따라서 마지막으로 staticHandler를 호출했으며, data->handler, 이는 this입니다->handler, 마지막에는 가상 함수가 StubCallbackHandler의 handler 함수로 호출됩니다.
물론 이렇게 복잡하지 않게도 할 수 있습니다. 두 번째 addFd 함수를 직접 사용하면 됩니다. 물론 callBack는 스스로 정의한 클래스를 통해 LooperCallBack 클래스를 구현하면 간단해집니다.
int addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data);
2.6 java 레이어의 addFd
이전에는 c 레이어의 Looper에서만 addFd를 사용할 수 있다고 생각했지만, 실제로는 java 레이어에서도 JNI를 통해 이 기능을 구현했습니다.
MessageQueue에서 addOnFileDescriptorEventListener을 통해 이 기능을 구현할 수 있습니다.
public void addOnFileDescriptorEventListener(@NonNull FileDescriptor fd, @OnFileDescriptorEventListener.Events int events, @NonNull OnFileDescriptorEventListener listener) { if (fd == null) { throw new IllegalArgumentException("fd must not be null"); } if (listener == null) { throw new IllegalArgumentException("listener must not be null"); } synchronized (this) { updateOnFileDescriptorEventListenerLocked(fd, events, listener); } }
OnFileDescriptorEventListener 이라는 콜백을 다시 보겠습니다
public interface OnFileDescriptorEventListener { public static final int EVENT_INPUT = 1 << 0; public static final int EVENT_OUTPUT = 1 << 1; public static final int EVENT_ERROR = 1 << 2; /** @hide */ @Retention(RetentionPolicy.SOURCE) @IntDef(flag=true, value={EVENT_INPUT, EVENT_OUTPUT, EVENT_ERROR}) public @interface Events {} @Events int onFileDescriptorEvents(@NonNull FileDescriptor fd, @Events int events); }
그 뒤에 updateOnFileDescriptorEventListenerLocked 함수를 호출했습니다
private void updateOnFileDescriptorEventListenerLocked(FileDescriptor fd, int events,) OnFileDescriptorEventListener listener) { final int fdNum = fd.getInt$(); int index = -1; FileDescriptorRecord record = null; if (mFileDescriptorRecords != null) { index = mFileDescriptorRecords.indexOfKey(fdNum); if (index >= 0) { record = mFileDescriptorRecords.valueAt(index); if (record != null && record.mEvents == events) { return; } } } if (events != 0) { events |= OnFileDescriptorEventListener.EVENT_ERROR; if (record == null) { if (mFileDescriptorRecords == null) { mFileDescriptorRecords = new SparseArray<FileDescriptorRecord>(); } record = new FileDescriptorRecord(fd, events, listener);//fd는 FileDescriptorRecord 객체에 저장됩니다 mFileDescriptorRecords.put(fdNum, record);//mFileDescriptorRecords에 저장됩니다 } else { record.mListener = listener; record.mEvents = events; record.mSeq += 1; } nativeSetFileDescriptorEvents(mPtr, fdNum, events);//native 함수 호출 } else if (record != null) { record.mEvents = 0; mFileDescriptorRecords.removeAt(index); } }
native가 마지막으로 NativeMessageQueue의 setFileDescriptorEvents 함수를 호출했습니다
static void android_os_MessageQueue_nativeSetFileDescriptorEvents(JNIEnv* env, jclass clazz, jlong ptr, jint fd, jint events) { NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr); nativeMessageQueue->setFileDescriptorEvents(fd, events); }
setFileDescriptorEvents 함수는 두 번째 addFd를 호출하는 것이기 때문에 NativeMessageQueue가 LooperCallback를 상속받았다는 것을 확신할 수 있습니다
void NativeMessageQueue::setFileDescriptorEvents(int fd, int events) { if (events) { int looperEvents = 0; if (events & CALLBACK_EVENT_INPUT) { looperEvents |= Looper::EVENT_INPUT; } if (events & CALLBACK_EVENT_OUTPUT) { looperEvents |= Looper::EVENT_OUTPUT; } mLooper->addFd(fd, Looper::POLL_CALLBACK, looperEvents, this, reinterpret_cast<void*>(events)); } else { mLooper->removeFd(fd); } }
물론이죠,handleEvent 함수를 구현해야 합니다
class NativeMessageQueue : public MessageQueue, public LooperCallback { public: NativeMessageQueue(); virtual ~NativeMessageQueue(); virtual void raiseException(JNIEnv* env, const char* msg, jthrowable exceptionObj); void pollOnce(JNIEnv* env, jobject obj, int timeoutMillis); void wake(); void setFileDescriptorEvents(int fd, int events); virtual int handleEvent(int fd, int events, void* data);
handleEvent는 looper에서 epoll_wait 이후로, 추가한 fd에 데이터가 있을 때 이 함수를 호출합니다
int NativeMessageQueue::handleEvent(int fd, int looperEvents, void* data) { int events = 0; if (looperEvents & Looper::EVENT_INPUT) { events |= CALLBACK_EVENT_INPUT; } if (looperEvents & Looper::EVENT_OUTPUT) { events |= CALLBACK_EVENT_OUTPUT; } if (looperEvents & (Looper::EVENT_ERROR | Looper::EVENT_HANGUP | Looper::EVENT_INVALID)) { events |= CALLBACK_EVENT_ERROR; } int oldWatchedEvents = reinterpret_cast<intptr_t>(data); int newWatchedEvents = mPollEnv->CallIntMethod(mPollObj, gMessageQueueClassInfo.dispatchEvents, fd, events); //콜백 호출 if (!newWatchedEvents) { return 0; // fd를 등록해제합니다 } if (newWatchedEvents != oldWatchedEvents) { setFileDescriptorEvents(fd, newWatchedEvents); } return 1; }
최종적으로 Java의 MessageQueue에서 dispatchEvents는 JNI 레이어에서 다시 호출되며, 이전에 등록된 콜백 함수를 호출합니다
// 네이티브 코드에서 호출됩니다. private int dispatchEvents(int fd, int events) { // 파일 디스크립터 레코드와 가능한 모든 상태를 가져오세요. final FileDescriptorRecord record; final int oldWatchedEvents; final OnFileDescriptorEventListener listener; final int seq; synchronized (this) { record = mFileDescriptorRecords.get(fd);//fd를 통해 FileDescriptorRecord를 얻으세요 if (record == null) { return 0; // 잘못된 이벤트, 리스너가 등록되지 않았습니다 } oldWatchedEvents = record.mEvents; events &= oldWatchedEvents; // 현재 감시 집합 기준으로 이벤트를 필터링하세요 if (events == 0) { return oldWatchedEvents; // 잘못된 이벤트, 감시 이벤트가 변경되었습니다 } listener = record.mListener; seq = record.mSeq; } // 락 밖에서 리스너를 호출하세요. int newWatchedEvents = listener.onFileDescriptorEvents(//리스너 콜백 record.mDescriptor, events); if (newWatchedEvents != 0) { newWatchedEvents |= OnFileDescriptorEventListener.EVENT_ERROR; } // 리스너가 감시할 이벤트 집합을 변경하면 파일 디스크립터 레코드를 업데이트하세요. // 관찰할 이벤트와 리스너 자체가 업데이트되지 않았던 이후입니다. if (newWatchedEvents != oldWatchedEvents) { synchronized (this) { int index = mFileDescriptorRecords.indexOfKey(fd); if (index >= 0 && mFileDescriptorRecords.valueAt(index) == record && record.mSeq == seq) { record.mEvents = newWatchedEvents; if (newWatchedEvents == 0) { mFileDescriptorRecords.removeAt(index); } } } } // native 코드가 관리할 새로운 이벤트 집합을 반환합니다. return newWatchedEvents; }
이것이 이 문서의 전체 내용입니다. 많은 도움이 되었기를 바랍니다. 또한, 나아가는 교재에 많은 지지를 부탁드립니다.
성명: 이 문서의 내용은 인터넷에서 가져왔으며, 저작권자는 모두에게 있으며, 인터넷 사용자가 자발적으로 기여하고 자체로 업로드한 내용입니다. 이 사이트는 소유권을 가지지 않으며, 인공 편집 처리를 하지 않았으며, 관련 법적 책임을 부담하지 않습니다. 저작권 문제가 의심되는 내용이 있다면, 이메일을 notice#w로 보내 주세요.3codebox.com(이메일을 보내면, #을 @으로 변경하십시오. 신고를 하고, 관련 증거를 제공하십시오. 사실을 확인하면, 이 사이트는 즉시 의심스러운 저작권 내용을 삭제합니다。)