Convenient messaging between threads

2 votes · 3 comments

The present snippet came up, when I struggled with messaging between different threads. The documentation for the event loop (Looper, Handler and HandlerThread) is not that clear, at least for me. The idea is to use a static thread-safe instance for emitting messages to other components even when they are running in different threads. You can use a simple addressing mechanism or safely broadcast your message to all subscribed components. This could be useful when you are dealing for example with a rendering thread, that needs to communicate with your state machine(s), which in turn needs to broadcast state changes (which was my case).

raw ·
copy
· download
import android.os.Handler; import android.os.HandlerThread; import android.os.Message; import android.os.Handler.Callback; import android.util.Log; /** * The transceiver provides a convenient way to decorate an instance with receive and transmit capabilities. * It can (and this is encouraged) be used with the {@link Broker} instance. * The naming thing is not part of the Transeiver's functionality, but may be useful while debugging. * This class can be used for subclassing or as a "has-a"-relation, as it is possible to set * an arbitrary {@link Callback } listener. * */ public class Transceiver { private String mName="Transceiver"; private static int ID=0; private HandlerThread mHandlerThread; private Handler mHandler; // delegates the messages, when no proprietary callback // is associated with the transeiver private class HandlerListener implements Callback{ @Override public boolean handleMessage(Message msg) { return Transceiver.this.handleMessage(msg); } } /** * Creates a transceiver with an automatically generated name */ public Transceiver(){ mName = new String("Transceiver(" + ++ID + ")"); create(new HandlerListener()); } /** * Creates a transceiver with arbitrary name * @param name */ public Transceiver(String name){ ++ID; mName = name; create(new HandlerListener() ); } /** * Creates a transceiver with arbitrary name and an own listener * @param name a custom name * @param l your own listener */ public Transceiver(String name, Callback l){ ++ID; mName = name; create(l); } private void create(Callback l){ mHandlerThread = new HandlerThread(getName()); mHandlerThread.start(); Log.d(mName, "Waiting for looper..."); // we need to wait (blocking) to get the looper instance while(!mHandlerThread.isAlive()) {}; mHandler = new Handler(mHandlerThread.getLooper(), l); Log.d(mName, "Ready"); } /** * @return The name of this transceiver */ public final String getName() { return mName; } /** * @return Its internal handler */ public final Handler getHandler() { return mHandler; } /** * The callback method for received messages. You can override it, when deriving from Transceiver. * @param msg the message * @return true, if the message was handled. */ public boolean handleMessage(Message msg) { return false; } } // ------------------------------- BROKER ----------------------------------------- import java.util.ArrayList; import android.os.Handler; import android.os.Message; /** * The broker is a kind of repository for {@link Handler} instances. It provides a simple and very convenient mechanism to * establish communication between different handlers. Just subscribe a handler instance and use the returned address with * this Broker. This class is thread safe, of course. * Usage example: * <code> * this.MyCompAddr = Broker.instance.subscribe( new Handler() ); * // or, when using transceiver * this.MyRendererAddr = Broker.instance.subscribe( new MyTransceiver("Renderer").getHandler() ); * [...] * // how to send messages * Message m = Message.obtain(); * m.what = 1; // could be an identifier or op-code or whatever you want * Bundle b= new Bundle(); // or use more complex (key,value)-pairs. * b.putString("data", "Any Data"); * m.setData(b); * Broker.instance.post(this.MyRendererAddr, m); * </code> * */ public class Broker { /** * The static instance. */ public static Broker instance=new Broker(); private ArrayList<Handler> mHandler; private Broker() { mHandler = new ArrayList<Handler> (); } /** * Subscribe a handler, so you can send messages easily to it. * Mind there is no check for doubled subscription. * @param h The handler * @return The "address" for the handler * @see {@link unsubscribe() } */ public synchronized int subscribe(Handler h){ ArrayList<Handler> hl=mHandler; hl.add(h); return hl.size()-1; } /** * Unsubscribes a handler. Invalid addresses are ignored * @param address */ public synchronized void unsubscribe(int address){ if(isAddressValid(address)) mHandler.remove(address); } protected synchronized final boolean isAddressValid(int address){ ArrayList<Handler> h=mHandler; return (!h.isEmpty() && address >= 0 && address < h.size() ); } /** * Post a message to addressed receiver. * @param address * @param m * @return true, if address exists, i.e. message could be delivered */ public synchronized boolean post(int address, Message m){ ArrayList<Handler> h=mHandler; if(!isAddressValid(address)) return false; h.get(address).sendMessage(Message.obtain(m)); return true; } /** * Sends a message to all connected handler * @param m The message * @return The number of notified handler, e.g. send messages. */ public synchronized int broadcast(Message m) { ArrayList<Handler> h=mHandler; int n=h.size(); for(int i=0; i<n; ++i) { h.get(i).sendMessage(Message.obtain(m)); } return n; } } // ------------------------------------------------------------------------------------ // Some implementation examples // ------------------------------------------------------------------------------------ /** * a place where I store my addresses. * Read-only access to once set addresses is thread-safe! */ public class AddressPool { public static int Subsystem1,Subsystem2,MyActivity; } // ## extending Transceiver (is-a-relation) ## public class Subsystem2 extends Transceiver { public final static String TAG="Subsystem2"; public final static int WHAT = 3862; // a magic number public boolean handleMessage(Message m){ switch(m.what){ case Subsystem1.WHAT: Log.i(TAG,"Received message from " + Subsystem1.TAG); // attention: when answering we need a new message Message msg= Message.obtain(m); Broker.instance.post(AddressPool.Subsystem1, msg); break; case MyActivity.WHAT: Log.i(TAG,"Received message from " + MyActivity.TAG ); break; } return true; } } // ## wrapping/using Transceiver (has-a-relation) ## package com.o1.android; import java.util.Timer; import java.util.TimerTask; import com.o1.android.util.message.Broker; import com.o1.android.util.message.Transceiver; import android.os.Bundle; import android.os.Handler; import android.os.HandlerThread; import android.os.Message; import android.os.Handler.Callback; import android.util.Log; public class Subsystem1 extends TimerTask implements Callback{ public final static String TAG="Subsystem1"; public final static int WHAT = 175; // any identifier private Transceiver mTransceiver; private int mMyAddress=-1; private Timer mTimer; public Subsystem1(){ mTransceiver = new Transceiver(TAG, this); mTimer = new Timer("SubsystemTimer"); } public void onStart(){ mMyAddress = Broker.instance.subscribe(mTransceiver.getHandler()); // we use a timer to asynchronously post messages mTimer.schedule(this, 0, 125); } public void onQuit(){ mTimer.cancel(); Broker.instance.unsubscribe(mMyAddress); } public final int getBrokerAddress() { return mMyAddress; } @Override public boolean handleMessage(Message m) { switch(m.what){ case Subsystem2.WHAT: Log.i(TAG,"Received message from " + Subsystem2.TAG); break; case MyActivity.WHAT: Log.i(TAG,"Received message from " + MyActivity.TAG); break; } // immediate response/delegate with a copy of our message Broker.instance.post(AddressPool.MyActivity, Message.obtain(m)); return true; } @Override public void run() { Message m= new Message(); m.what=WHAT; Broker.instance.post(AddressPool.MyActivity, m); } } // ## connecting activities ## public class MyActivity extends Activity implements Callback { private Subsystem1 mSubsystem1; //... other member fields @Override public void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.main); /* * We have different types of transceiver implementations. * One as "has" (subsystem1) and the other as "is"-relation. */ mSubsystem1 = new Subsystem1(); AddressPool.Subsystem2 = Broker.instance.subscribe(new Subsystem2().getHandler()); // additionally, we can use activities (or other components with loopers) // mind that our activity implements Callback AddressPool.MyActivity = Broker.instance.subscribe(new Handler(this)); // ... your stuff, e.g. a button that triggers messages } @Override public void onStart(){ super.onStart(); mSubsystem1.onStart(); AddressPool.Subsystem1 = mSubsystem1.getBrokerAddress(); } @Override public void onDestroy(){ super.onDestroy(); mSubsystem1.onQuit(); } @Override public boolean handleMessage(Message msg) { // handle your messages here }
Add a comment

3 Comments

Errata: In lines 313,316,325 you need to assign the addresses to the related AddressPool.XXX fields, of course.

Sorry!

Reply · May 19, 2009, 12:06 a.m.

Shouldn't you put some kind of Thread.sleep here:

while(!mHandlerThread.isAlive()) {};

Reply · Aug. 9, 2009, 9:05 a.m.

You can use sleep here (I say so without having tested). Maybe, this would enburden the CPU a little. Most important: You need to wait for the looper thread or you'll receive a NullPointer in the next line. IMHO it really doesn't matter, if you use sleep or a fierce while()-block.

btw: There's a bug with the naive addressing mechanism when using unsubscribe(int). Using subscribe after unsubscribed was invoked, the returned address is ambigouos. This need to be fixed, of course.

Reply · Aug. 13, 2009, 6:14 a.m.