记得以前我们使用类似“快牙”这些文件分享工具的时候,一开始就是先在 手机A 上创建一个“房间”,然后连接上 手机A WiFi 热点的其他手机(即这些手机处于一个局域网内)就可以发现到这个房间并加入到这个房间里面,然后就可以互相分享文件了。那没有建立连接的情况下,“发现房间”这个功能是怎么实现的呢? 首先,既然 手机A 处于局域网中,那么根据 手机A 当前在局域网的 IP 地址和子网掩码,就可以获得这个局域网内所有机器的 IP 地址 的范围。如果在没有建立连接的情况下,手机A 就可以给这个范围内的每个 IP 地址都发送一个消息 —— 那么如果某个 IP 地址的机器(设为 手机B)会对这个消息做出回应,便说明 手机B 是 手机A 的“自己人”,那么 手机A 便可以告诉 手机B 它在当前的局域网建了一个“房间”,房间号是个啥,然后 手机B 可以选择是否加入到这个“房间”。 在Java网络编程(1)中,我们已经知道可以使用 NetworkInterface 来获得机器在局域网内 IP 地址; 在Java网络编程(2)中,我们知道使用 UDP,便可以在不建立连接的情况下,直接向某个 IP 地址发送消息; 如果每次都是遍历这个局域网内所有的 IP 地址,并使用 UDP 向每个 IP 发送消息,那样就有点麻烦了。事实上,我们可以使用广播。每个局域网都有一个对应的广播地址,向广播地址发送的数据包通过网关设备(比如路由器)时,网关设备会向局域网的每台设备发送一份该数据包的副本。通过 IP 和子网掩码计算广播地址的方法简单的形容就是 (IP地址)|(~子网掩码)—— 将子网掩码按位取反再和IP地址进行或运算,比如当前机器在局域网内的地址为 192.168.1.3,子网掩码为 255.255.255.0(取反后为 0.0.0.255),那么广播地址为 192.168.1.255。广播也是在不建立连接的情况下就发送数据,所以广播不能通过 TCP 实现,只能是 UDP。在 Java 中,通过 UDP 进行广播和单播(即只向一个 IP 地址发送数据包)的程序几乎没有区别,只是地址由一个特定的单播地址(如 192.168.1.3)变为了其对应的广播地址(192.168.1.255)。 现在让我们来实现下面的功能: 1、Broadcaster 创建一个房间,并每隔 1 秒向局域网广播一个特定的消息; 2、同一个局域网的 Device 如果收到了 3 次这个特定的消息,之后便向 Broadcaster 发送加入房间的消息; 3、Broadcaster 收到 Device 请求加入房间的消息后,将 Device 加入房间。 首先定义发送者类和接收者类,他们都实现了 Runnable,分别可以用来发送和接收: Sender.java - import java.io.IOException;
- import java.net.*;
- public class Sender implements Runnable {
- private static final byte[] EMPTY_DATA = new byte[0];
- private final DatagramSocket socket;
- private final SocketAddress broadcastAddress;
- private final long sendingInterval; // unit is ms
- public Sender(DatagramSocket socket,
- SocketAddress broadcastAddress, int sendingInterval) {
- this.socket = socket;
- this.broadcastAddress = broadcastAddress;
- this.sendingInterval = sendingInterval;
- }
- @Override
- public void run() {
- while (true) {
- byte[] data = getNextData();
- if (data == null || data.length == 0) {
- break;
- }
- DatagramPacket outPacket = new DatagramPacket(
- data, data.length, broadcastAddress);
- try {
- socket.send(outPacket);
- System.out.println("Sender: Data has been sent");
- Thread.sleep(sendingInterval);
- } catch (IOException | InterruptedException ex) {
- System.err.println("Sender: Error occurred while sending packet");
- break;
- }
- }
- System.out.println("Sender: Thread is end");
- }
- /**
- * 获得下一次发送的数据<br>
- * 子类需要重写这个方法,返回下一次要发送的数据
- *
- * @return 下一次发送的数据
- */
- public byte[] getNextData() {
- return EMPTY_DATA;
- }
- }
复制代码Receiver.java - import java.io.IOException;
- import java.net.*;
- public class Receiver implements Runnable {
- private final int BUF_SIZE = 512;
- private final DatagramSocket socket;
- public Receiver(DatagramSocket socket) {
- this.socket = socket;
- }
- @Override
- public void run() {
- byte[] inData = new byte[BUF_SIZE];
- DatagramPacket inPacket = new DatagramPacket(inData, inData.length);
- while (true) {
- try {
- socket.receive(inPacket);
- if (!handlePacket(inPacket)) {
- break;
- }
- } catch (IOException ex) {
- System.out.println("Receiver: Socket was closed.");
- break;
- }
- }
- System.out.println("Receiver: Thread is end");
- }
- /**
- * 处理接收到的数据报<br>
- * 子类需要重写这个方法,处理接收到的数据包,并返回是否继续接收
- *
- * @param packet 接收到的数据报
- * @return 是否需要继续接收
- */
- public boolean handlePacket(DatagramPacket packet) {
- return false;
- }
- }
复制代码然后我们定义 Device 和 Broadcaster: Device.java - import java.io.IOException;
- import java.net.*;
- public class Device {
- private static final int DEFAULT_LISTENING_PORT = 10000;
- private final InetAddress address;
- private final int port;
- private DatagramSocket socket;
- public Device(int port) throws IOException {
- this.port = port;
- this.address = InetAddress.getLocalHost();
- }
- public Device(InetAddress address, int port) {
- this.address = address;
- this.port = port;
- }
- public void start() throws SocketException, InterruptedException {
- System.out.println("Device has been started...");
- InetAddress lanAddr = LANAddressTool.getLANAddressOnWindows();
- if (lanAddr != null) {
- System.out.println("Device: LAN Address: " + lanAddr.getHostAddress());
- }
- socket = new DatagramSocket(port);
- Receiver receiver = new Receiver(socket) {
- int recvCount = 0;
- @Override
- public boolean handlePacket(DatagramPacket packet) {
- String recvMsg = new String(packet.getData(), 0, packet.getLength());
- if ("ROOM".equals(recvMsg)) {
- System.out.printf("Device: Received msg '%s'n", recvMsg);
- recvCount++;
- if (recvCount == 3) {
- byte[] data = "JOIN".getBytes();
- DatagramPacket respMsg = new DatagramPacket(
- data, data.length, packet.getSocketAddress()); // 此时 packet 包含了发送者地址和监听端口
- try {
- socket.send(respMsg);
- System.out.println("Device: Sent response 'JOIN'");
- } catch (IOException ex) {
- ex.printStackTrace(System.err);
- }
- return false; // 停止接收
- }
- }
- return true;
- }
- };
- Thread deviceThread = new Thread(receiver);
- deviceThread.start(); // 启动接收数据包的线程
- deviceThread.join();
- close();
- System.out.println("Device has been closed.");
- }
- public void close() {
- if (socket != null) {
- socket.close();
- }
- }
- @Override
- public String toString() {
- return "Device {" + "address=" + address + ", port=" + port + '}';
- }
- public static void main(String[] args) throws Exception {
- Device device = new Device(DEFAULT_LISTENING_PORT);
- device.start();
- }
- }
复制代码Broadcaster.java - import java.net.*;
- public class Broadcaster {
- private static final int DEFAULT_BROADCAST_PORT = 10000;
- private final InetAddress bcAddr;
- private final int bcPort;
- private DatagramSocket socket;
- public Broadcaster(InetAddress broadcastAddress, int broadcastPort) {
- this.bcAddr = broadcastAddress;
- this.bcPort = broadcastPort;
- }
- public void start() throws SocketException, InterruptedException {
- System.out.println("Broadcaster has been started...");
- final Room room = new Room("Test");
- System.out.printf("Broadcaster: Created room '%s'nn", room.getName());
- socket = new DatagramSocket();
- SocketAddress bcSocketAddr = new InetSocketAddress(bcAddr, bcPort);
- Sender sender = new Sender(socket, bcSocketAddr, 1000) {// 每隔 1000ms 广播一次
- final byte[] DATA = "ROOM".getBytes();
- @Override
- public byte[] getNextData() {
- return DATA;
- }
- };
- Receiver recver = new Receiver(socket) {
- @Override
- public boolean handlePacket(DatagramPacket packet) {
- String recvMsg = new String(packet.getData(), 0, packet.getLength());
- if ("JOIN".equals(recvMsg)) {
- Device device = new Device(packet.getAddress(), packet.getPort());
- room.addDevice(device);
- room.listDevices();
- }
- return true; // 一直接收
- }
- };
- Thread senderThread = new Thread(sender);
- Thread recverThread = new Thread(recver);
- senderThread.start(); // 启动发送(广播)数据包的线程
- recverThread.start(); // 启动接收数据包的线程
- senderThread.join();
- recverThread.join();
- close();
- }
- public void close() {
- if (socket != null) {
- socket.close();
- }
- }
- public static void main(String[] args) throws Exception {
- InetAddress bcAddr = LANAddressTool.getLANBroadcastAddressOnWindows();
- if (bcAddr != null) {
- System.out.println("Broadcast Address: " + bcAddr.getHostAddress());
- Broadcaster broadcaster = new Broadcaster(bcAddr, DEFAULT_BROADCAST_PORT);
- broadcaster.start();
- } else {
- System.out.println("Please check your LAN~");
- }
- }
- }
复制代码Room.java - import java.util.*;
- public class Room {
- private final String name;
- private final List<Device> devices;
- public Room(String name) {
- this.name = name;
- this.devices = new ArrayList<>();
- }
- public boolean addDevice(Device device) {
- return devices.add(device);
- }
- public String getName() {
- return name;
- }
- public void listDevices() {
- System.out.printf("Room (%s), current devices:n", name);
- for (Device device : devices) {
- System.out.println(device);
- }
- }
- }
复制代码(完整的 Demo 可以访问:https://github.com/mizhoux/LA...) 我们将这个 Demo 打包成 jar,然后开始运行: 1、首先我们在本机上启动 Broadcaster:
 2、我们将本机作为一个 Device 启动:
 可以看到此时 Broadcaster 创建的房间已经有了一个 Device:
 3、我们启动局域网内的另外一台设备:
 此时 Broadcaster 创建的房间便有两个 Device:
 4、再启动局域网内的一台设备:
 此时房间里则有三个 Device:
 因为 UDP 在不需要建立连接的基础上就可以发送消息,所以它可以方便的用来探测局域网内特定类型的机器 —— 这是个很有用的功能 —— 又比如一个集群当中可能会突然有机器宕机,为了检测这一事件的发生,就需要集群 master机器 每隔一定的时间向每台机器发送若干心跳检测包,如果有回复说明机器正常,否则说明该机器出现了故障,此时不需要连接而且高效的 UDP 就十分适合这种场合。当然,我们始终还是要考虑到 UDP 是不可靠的协议,它并不能代替 TCP —— 永远需要根据环境,来选择最合适的技术。 |