“一次,两次,以这个价格成交”,是的,你没听错,我们说的就是竞拍。我们都在电影或现场拍卖中听说过,人们在拍卖中相互竞争,以赢得最终的头衔。一些平台正在为在线拍卖提供实时竞价。在本文中,我们将讨论实时竞价背后的机制、工作原理以及使用 Nest.js 和 Socket IO 进行开发。
什么是 Socket.IO?
Socket.io 是一种流行的 JavaScript 库,它允许我们在网络浏览器和服务器之间创建实时双向通信。它是一个高性能、高可靠性的库,设计用于处理大量数据。它遵循 WebSocket 协议并提供更好的功能,使我们能够构建高效的实时应用程序。
发布-订阅设计模式
发布-订阅(Publish-Subscribe)又称发布/订阅(pub/sub),是客户端与服务器之间实时通信的常用设计模式。它允许服务器通过通道/媒介实时向客户端发送消息。这些信息的发送者(发布者)并不明确标识目标接收者。相反,信息是通过一个信道发送的,信道上可能有任意数量的接收者(订阅者)在等待这些信息。
发布者: 负责为目标受众生成事件/信息。
订阅者: 收听事件或接收发布者信息的受众。
通道: 进行数据交换的媒介。
实时竞价是如何运作的?
我们已经看到了多个在线竞价平台,但竞价过程是如何在幕后进行的?让我们来看看发生在后台的事件:
- 假设两个用户 A 和 B 同时访问一个在线拍卖平台。
- 一旦他们表现出参与竞拍的意图,网络浏览器(客户端)就会订阅这两个用户,以监听 “出价 “事件。
- 另一个用户 C 同时出价,出价请求到达服务器。
- 服务器处理请求,然后向所有订阅者发布 “出价 “事件。
- 用户 A 和 B 都订阅了 “出价 “事件,因此他们会收到服务器发送的事件,出价金额也会实时更新。
- 这样,所有访问平台的用户都能看到实时竞价,甚至无需刷新网页浏览器即可参与竞价。
并发管理
实时竞价的关键之一是管理并发事务。让我们举一个简单的例子来理解这种情况:三个用户 A、B 和 C 在一次在线拍卖中相互竞争。
用户 A 首先出价 5 万美元,处于领先地位,现在用户 B 和 C 想要领先,同时出价 6 万美元。理想情况下,先到达服务器的出价必须被接受,而另一个出价必须被拒绝。
使用链接列表可以轻松解决并发事务的问题。对于每次拍卖,出价都将以链接列表的形式进行,因此每个出价都与最后一次出价相连,并带有特殊的唯一性约束,即链接列表中的所有父节点都必须是唯一的。
让我们重温一下这个例子,看看链接列表是如何解决并发交易问题的。用户 A 最初出价开始拍卖时,该出价成为链接列表的首节点,例如 id – 5。现在,用户 B 和 C 的父出价都是 5,当他们同时出价时,由于两个事务的父出价相同,最先到达服务器的出价将被处理,而另一个出价将因父出价唯一性约束而被拒绝。
执行
1. 创建一个新的NestJs项目并安装所需的依赖项:
$ npm i -g @nestjs/cli
$ nest new bidding-engine
$ npm i socket.io
$ npm i ioredis
$ npm i –save sequelize sequelize-typescript mysql2
$ npm I –save-dev @types/sequelize
2. 创建Redis适配器来初始化pub/sub客户端
export class RedisIoAdapter extends IoAdapter {
private adapterConstructor: ReturnType<typeof createAdapter>;
constructor(
app: INestApplication,
private readonly configService: ConfigService,
) {
super(app);
}
async connectToRedis(): Promise<void> {
const redisConfig = this.configService.get('redis');
let publish: Redis | Cluster = new Redis({
host: host,
port: port,
});
const subscribe = publish.duplicate();
publish.on('error', (error) => {
console.log('redis connection failed: ', error);
});
subscribe.on('error', (error) => {
console.log('redis connection failed: ', error);
});
this.adapterConstructor = createAdapter(publish, subscribe);
}
createIOServer(port: number, options?: ServerOptions) {
const server: Server = super.createIOServer(port, options) as Server;
server.adapter(this.adapterConstructor);
const webSocketConfig = this.configService.get<any>('webSocket');
const timeout: number =
webSocketConfig?.websocketHearthbeatTimeout || 30000;
setInterval(() => {
const clients: Map<string, Socket> = server.sockets.sockets;
Object.keys(clients).forEach((socketId) => {
const socket: Socket = clients[socketId] as Socket;
if (socket.connected) {
socket.send('ping');
}
});
}, timeout);
server.on('connection', (socket) => {
socket.on('message', (message: string) => {
if (message === 'pong') {
const pingTimeout = socket['pingTimeout'] as { refresh: () => void };
pingTimeout.refresh();
}
});
});
return server;
}
}
3. 创建适配器后,我们将适配器集成到应用程序中以初始化 Redis 和 pub/sub 客户端
const redisIoAdapter = new RedisIoAdapter(app, configService);
await redisIoAdapter.connectToRedis();
app.useWebSocketAdapter(redisIoAdapter);
4. 创建一个网关,允许用户订阅事件并在出价后立即发布事件。
@WebSocketGateway({
cors: true,
namespace: 'place-bid',
transports: ['websocket'],
})
export class AuctionGateway {
@WebSocketServer()
private readonly server: Server;
@SubscribeMessage('place-bid-join')
async joinRoom(client: Socket, roomId: string): Promise<void> {
await client.join(roomId);
}
@SubscribeMessage('place-bid-leave')
async leaveRoom(client: Socket, roomId: string): Promise<void> {
await client.leave(roomId);
}
public placeBidToRoom(roomId: string, payload: any): void {
this.server.to(roomId).emit('new-bid-placed', payload);
}
}
5. 适配器和网关都已就位,现在我们要创建一个 API 端点,它将响应即将到来的出价(获取拍卖 ID 和出价金额),在数据库中插入出价,检查并发性,并发布出价事件。我们将创建不同的类来处理各自的逻辑,如下所示:
Bid Controller 类将作为下达新竞价的端点。
投标服务类将处理业务逻辑和相应的验证。
竞标存储库类将处理竞标的插入,并提供上一次提交的竞标。
投标基本模型代表投标表的数据库视图。
@Controller()
export class BidController {
constructor(
private readonly bidService: BidService,
private readonly appGateway: AuctionGateway,
) {}
@Post('bid')
@HttpCode(HttpStatus.OK)
async placeBid(@Body() request: { auctionId: string; bidAmount: number, parentBidId: string | null }) {
const response = (await this.buyerService.placeBid(
auctionId,
bidAmount,
parentBidId,
)) as any;
const { bidDetails, auctionId } = response;
const wsResponse: any = {
amount: bidDetails.value || 0,
auctionId,
bidId: bidDetails.id || '',
createdAt: bidDetails.bidTime,
};
this.appGateway.placeBidToRoom(request.auctionId, wsResponse);
return {
success: true,
};
}
}
@Injectable()
export class BidService {
constructor(
private readonly sequelize: Sequelize,
private readonly bidRepository: BidRepository,
) {}
async placeBid(auctionId: string,
bidAmount: number,
parentBidId: string | null) {
try {
const bidCreated = await this.sequelize.transaction(
async (transaction) =>
this.bidRepository.create(
{
bidAmount,
parentBid: parentBidId,
auctionId,
},
transaction,
),
);
const response = {
auctionId,
bidDetails: {
bidTime: new Date(),
id: bidCreated.id,
value: bidCreated.bidAmount,
},
success: true,
};
return response;
} catch (error) {
if (
error instanceof ValidationError &&
error.name === 'SequelizeUniqueConstraintError'
) {
const exception = {
error: 'You have been outbid',
details: {},
};
throw new ConflictException(exception);
}
if (error instanceof HttpException) {
throw error;
}
throw new InternalServerErrorException(error);
}
}
}
@Injectable()
export class BidRepository {
constructor(@InjectModel(BidBase) private bidModel: typeof BidBase) {}
create(bid: Partial<BidBase>, transaction: Transaction) {
const data: Optional<BidBase, never> = {
...(bid as Required<BidBase>),
};
return this.bidModel.create(data, { transaction });
}
public async findLastBid(auctionId: string): Promise<BidBase | null> {
return this.bidModel.findOne({
where: {
auctionId,
},
order: [['created_at', 'DESC']],
});
}
}
type CreationColumns = 'auctionId' | 'bidAmount' | 'parentBid';
export type CreateBidParams = Pick<BidBase, CreationColumns>;
@Table({
tableName: 'tbl_bid',
underscored: true,
})
export class BidBase extends Model<BidBase, CreateBidParams> {
@PrimaryKey
@IsUUID('all')
@Default(Sequelize.literal('NewId()'))
@Column
id!: string;
@Column
auctionId!: string;
@Column({ allowNull: true })
parentBid?: string;
@Column(DataType.DECIMAL(10, 2))
bidAmount!: number;
@CreatedAt
createdAt!: Date;
@UpdatedAt
updatedAt?: Date;
@DeletedAt
deletedAt?: Date;
@Column
createdBy?: string;
@Column
updatedBy?: string;
@Column
deletedBy?: string;
}
6. 在 parentBid 和 auctionId 上创建唯一群集索引,以防止并发交易问题。此外,在 parentBid 上创建一个外键,引用竞标基础模型的 id,以创建链接列表结构。
7. 最后,将竞价控制器集成到主应用程序中,现在您的终端就可以处理竞价并向用户发布事件了。
好极了,我们成功地使用 NestJs 开发了一个实时竞价引擎。您可以集成一个花哨的用户界面来监听竞价事件,提供用户登录,并与您的朋友竞相赢得拍卖。您还可以创建管理仪表板,查看即将发生的竞拍,在竞拍过程中拒绝竞拍,并宣布获胜者 “Goin 一次,Goin 两次,您是赢家 …..”。
作者:Asmit Bajaj
编译自medium.
本文来自作者投稿,版权归原作者所有。如需转载,请注明出处:https://www.nxrte.com/jishu/im/33843.html